iota_graphql_rpc/types/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt::Write,
8};
9
10use async_graphql::{
11    connection::{Connection, CursorType, Edge},
12    dataloader::Loader,
13    *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17    models::objects::{StoredHistoryObject, StoredObject},
18    schema::{objects, objects_history, objects_version},
19    types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22    TypeTag,
23    object::{
24        MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25        bounded_visitor::BoundedVisitor,
26    },
27};
28use move_core_types::{
29    annotated_value::{MoveStruct, MoveTypeLayout},
30    language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    connection::ScanConnection,
36    consistency::{Checkpointed, View, build_objects_query},
37    data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
38    error::Error,
39    filter, or_filter,
40    raw_query::RawQuery,
41    types::{
42        available_range::AvailableRange,
43        balance::{self, Balance},
44        base64::Base64,
45        big_int::BigInt,
46        coin::Coin,
47        coin_metadata::CoinMetadata,
48        cursor::{self, Page, RawPaginated, ScanLimited, Target},
49        digest::Digest,
50        display::{Display, DisplayEntry},
51        dynamic_field::{DynamicField, DynamicFieldName},
52        intersect,
53        iota_address::{IotaAddress, addr},
54        iota_names_registration::{NameFormat, NameRegistration},
55        move_object::MoveObject,
56        move_package::MovePackage,
57        owner::{Owner, OwnerImpl},
58        stake::StakedIota,
59        transaction_block,
60        transaction_block::{TransactionBlock, TransactionBlockFilter},
61        type_filter::{ExactTypeFilter, TypeFilter},
62        uint53::UInt53,
63    },
64};
65
66#[derive(Clone, Debug)]
67pub(crate) struct Object {
68    pub address: IotaAddress,
69    pub kind: ObjectKind,
70    /// The checkpoint sequence number at which this was viewed at.
71    pub checkpoint_viewed_at: u64,
72    /// Root parent object version for dynamic fields.
73    ///
74    /// This enables consistent dynamic field reads in the case of chained
75    /// dynamic object fields, e.g., `Parent -> DOF1 -> DOF2`. In such
76    /// cases, the object versions may end up like `Parent >= DOF1, DOF2`
77    /// but `DOF1 < DOF2`. Thus, database queries for dynamic fields must
78    /// bound the object versions by the version of the root object of the tree.
79    ///
80    /// Essentially, lamport timestamps of objects are updated for all top-level
81    /// mutable objects provided as inputs to a transaction as well as any
82    /// mutated dynamic child objects. However, any dynamic child objects
83    /// that were loaded but not actually mutated don't end up having
84    /// their versions updated.
85    root_version: u64,
86}
87
88/// Type to implement GraphQL fields that are shared by all Objects.
89pub(crate) struct ObjectImpl<'o>(pub &'o Object);
90
91#[derive(Clone, Debug)]
92#[expect(clippy::large_enum_variant)]
93pub(crate) enum ObjectKind {
94    /// An object loaded from serialized data, such as the contents of a
95    /// transaction that hasn't been indexed yet.
96    NotIndexed(NativeObject),
97    /// An object fetched from the index.
98    Indexed(NativeObject, StoredHistoryObject),
99    /// The object is wrapped or deleted and only partial information can be
100    /// loaded from the indexer. The `u64` is the version of the object.
101    WrappedOrDeleted(u64),
102}
103
104#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
105#[graphql(name = "ObjectKind")]
106pub enum ObjectStatus {
107    /// The object is loaded from serialized data, such as the contents of a
108    /// transaction that hasn't been indexed yet.
109    NotIndexed,
110    /// The object is fetched from the index.
111    Indexed,
112    /// The object is deleted or wrapped and only partial information can be
113    /// loaded from the indexer.
114    WrappedOrDeleted,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
118pub(crate) struct ObjectRef {
119    /// ID of the object.
120    pub address: IotaAddress,
121    /// Version or sequence number of the object.
122    pub version: UInt53,
123    /// Digest of the object.
124    pub digest: Digest,
125}
126
127/// Constrains the set of objects returned. All filters are optional, and the
128/// resulting set of objects are ones whose
129///
130/// - Type matches the `type` filter,
131/// - AND, whose owner matches the `owner` filter,
132/// - AND, whose ID is in `objectIds` OR whose ID and version is in
133///   `objectKeys`.
134#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
135pub(crate) struct ObjectFilter {
136    /// Filter objects by their type's `package`, `package::module`, or their
137    /// fully qualified type name.
138    ///
139    /// Generic types can be queried by either the generic type name, e.g.
140    /// `0x2::coin::Coin`, or by the full type name, such as
141    /// `0x2::coin::Coin<0x2::iota::IOTA>`.
142    pub type_: Option<TypeFilter>,
143
144    /// Filter for live objects by their current owners.
145    pub owner: Option<IotaAddress>,
146
147    /// Filter for live objects by their IDs.
148    pub object_ids: Option<Vec<IotaAddress>>,
149
150    /// Filter for live or potentially historical objects by their ID and
151    /// version.
152    pub object_keys: Option<Vec<ObjectKey>>,
153}
154
155#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
156pub(crate) struct ObjectKey {
157    pub object_id: IotaAddress,
158    pub version: UInt53,
159}
160
161/// The object's owner type: Immutable, Shared, Parent, or Address.
162#[derive(Union, Clone)]
163pub(crate) enum ObjectOwner {
164    Immutable(Immutable),
165    Shared(Shared),
166    Parent(Box<Parent>),
167    Address(AddressOwner),
168}
169
170/// An immutable object is an object that can't be mutated, transferred, or
171/// deleted. Immutable objects have no owner, so anyone can use them.
172#[derive(SimpleObject, Clone)]
173pub(crate) struct Immutable {
174    #[graphql(name = "_")]
175    dummy: Option<bool>,
176}
177
178/// A shared object is an object that is shared using the
179/// 0x2::transfer::share_object function. Unlike owned objects, once an object
180/// is shared, it stays mutable and is accessible by anyone.
181#[derive(SimpleObject, Clone)]
182pub(crate) struct Shared {
183    initial_shared_version: UInt53,
184}
185
186/// If the object's owner is a Parent, this object is part of a dynamic field
187/// (it is the value of the dynamic field, or the intermediate Field object
188/// itself). Also note that if the owner is a parent, then it's guaranteed to be
189/// an object.
190#[derive(SimpleObject, Clone)]
191pub(crate) struct Parent {
192    parent: Option<Object>,
193}
194
195/// An address-owned object is owned by a specific 32-byte address that is
196/// either an account address (derived from a particular signature scheme) or
197/// an object ID. An address-owned object is accessible only to its owner and no
198/// others.
199#[derive(SimpleObject, Clone)]
200pub(crate) struct AddressOwner {
201    owner: Option<Owner>,
202}
203
204/// Filter for a point query of an Object.
205pub(crate) enum ObjectLookup {
206    LatestAt {
207        /// The checkpoint sequence number at which this was viewed at.
208        checkpoint_viewed_at: u64,
209    },
210
211    UnderParent {
212        /// The parent version to be used as an upper bound for the query. Look
213        /// for the latest version of a child object whose version is
214        /// less than or equal to this upper bound.
215        parent_version: u64,
216        /// The checkpoint sequence number at which this was viewed at.
217        checkpoint_viewed_at: u64,
218    },
219
220    VersionAt {
221        /// The exact version of the object to be fetched.
222        version: u64,
223        /// The checkpoint sequence number at which this was viewed at.
224        checkpoint_viewed_at: u64,
225    },
226
227    /// Variant analogous to [`VersionAt`](Self::VersionAt) but for optimistic
228    /// transactions, using the most recent not checkpointed data,
229    /// not bound by any checkpoint sequence number.
230    OptimisticVersion {
231        /// The exact version of the object to be fetched.
232        version: u64,
233    },
234}
235
236pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
237
238/// The inner struct for the `Object`'s cursor. The `object_id` is used as the
239/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
240/// the cursor.
241#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
242pub(crate) struct HistoricalObjectCursor {
243    #[serde(rename = "o")]
244    object_id: Vec<u8>,
245    /// The checkpoint sequence number this was viewed at.
246    #[serde(rename = "c")]
247    checkpoint_viewed_at: u64,
248}
249
250/// Interface implemented by on-chain values that are addressable by an ID (also
251/// referred to as its address). This includes Move objects and packages.
252#[expect(clippy::duplicated_attributes)]
253#[derive(Interface)]
254#[graphql(
255    name = "IObject",
256    field(name = "version", ty = "UInt53"),
257    field(
258        name = "status",
259        ty = "ObjectStatus",
260        desc = r#"
261            The current status of the object as read from the off-chain store. The
262            possible states are:
263            - NOT_INDEXED: The object is loaded from serialized data, such as the
264            contents of a genesis or system package upgrade transaction.
265            - INDEXED: The object is retrieved from the off-chain index and
266            represents the most recent or historical state of the object.
267            - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
268            information can be loaded.
269        "#
270    ),
271    field(
272        name = "digest",
273        ty = "Option<String>",
274        desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
275                string."
276    ),
277    field(
278        name = "owner",
279        ty = "Option<ObjectOwner>",
280        desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
281                Immutable and Shared Objects do not have owners."
282    ),
283    field(
284        name = "previous_transaction_block",
285        ty = "Option<TransactionBlock>",
286        desc = "The transaction block that created this version of the object."
287    ),
288    field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
289    field(
290        name = "received_transaction_blocks",
291        arg(name = "first", ty = "Option<u64>"),
292        arg(name = "after", ty = "Option<transaction_block::Cursor>"),
293        arg(name = "last", ty = "Option<u64>"),
294        arg(name = "before", ty = "Option<transaction_block::Cursor>"),
295        arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
296        arg(name = "scan_limit", ty = "Option<u64>"),
297        ty = "ScanConnection<String, TransactionBlock>",
298        desc = "The transaction blocks that sent objects to this object."
299    ),
300    field(
301        name = "bcs",
302        ty = "Option<Base64>",
303        desc = "The Base64-encoded BCS serialization of the object's content."
304    )
305)]
306pub(crate) enum IObject {
307    Object(Object),
308    MovePackage(MovePackage),
309    MoveObject(MoveObject),
310    Coin(Coin),
311    CoinMetadata(CoinMetadata),
312    StakedIota(StakedIota),
313}
314
315/// `DataLoader` key for fetching an `Object` at a specific version, constrained
316/// by a consistency cursor (if that version was created after the checkpoint
317/// the query is viewing at, then it will fail).
318#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
319struct HistoricalKey {
320    id: IotaAddress,
321    version: u64,
322    checkpoint_viewed_at: u64,
323}
324
325/// `DataLoader` key for fetching objects that haven't been checkpointed yet.
326/// This is used specifically for loading objects
327/// that are part of optimistic transaction effects.
328#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
329struct OptimisticKey {
330    id: IotaAddress,
331    version: u64,
332}
333
334/// `DataLoader` key for fetching the latest version of an object whose parent
335/// object has version `parent_version`, as of `checkpoint_viewed_at`. This
336/// look-up can fail to find a valid object if the key is not self-consistent,
337/// for example if the `parent_version` is set to a higher version
338/// than the object's actual parent as of `checkpoint_viewed_at`.
339#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
340struct ParentVersionKey {
341    id: IotaAddress,
342    parent_version: u64,
343    checkpoint_viewed_at: u64,
344}
345
346/// `DataLoader` key for fetching the latest version of an object as of a given
347/// checkpoint.
348#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
349struct LatestAtKey {
350    id: IotaAddress,
351    checkpoint_viewed_at: u64,
352}
353
354/// An object in IOTA is a package (set of Move bytecode modules) or object
355/// (typed data structure with fields) with additional metadata detailing its
356/// id, version, transaction digest, owner field indicating how this object can
357/// be accessed.
358#[Object]
359impl Object {
360    pub(crate) async fn address(&self) -> IotaAddress {
361        OwnerImpl::from(self).address().await
362    }
363
364    /// Objects owned by this object, optionally `filter`-ed.
365    pub(crate) async fn objects(
366        &self,
367        ctx: &Context<'_>,
368        first: Option<u64>,
369        after: Option<Cursor>,
370        last: Option<u64>,
371        before: Option<Cursor>,
372        filter: Option<ObjectFilter>,
373    ) -> Result<Connection<String, MoveObject>> {
374        OwnerImpl::from(self)
375            .objects(ctx, first, after, last, before, filter)
376            .await
377    }
378
379    /// Total balance of all coins with marker type owned by this object. If
380    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
381    pub(crate) async fn balance(
382        &self,
383        ctx: &Context<'_>,
384        type_: Option<ExactTypeFilter>,
385    ) -> Result<Option<Balance>> {
386        OwnerImpl::from(self).balance(ctx, type_).await
387    }
388
389    /// The balances of all coin types owned by this object.
390    pub(crate) async fn balances(
391        &self,
392        ctx: &Context<'_>,
393        first: Option<u64>,
394        after: Option<balance::Cursor>,
395        last: Option<u64>,
396        before: Option<balance::Cursor>,
397    ) -> Result<Connection<String, Balance>> {
398        OwnerImpl::from(self)
399            .balances(ctx, first, after, last, before)
400            .await
401    }
402
403    /// The coin objects for this object.
404    ///
405    /// `type` is a filter on the coin's type parameter, defaulting to
406    /// `0x2::iota::IOTA`.
407    pub(crate) async fn coins(
408        &self,
409        ctx: &Context<'_>,
410        first: Option<u64>,
411        after: Option<Cursor>,
412        last: Option<u64>,
413        before: Option<Cursor>,
414        type_: Option<ExactTypeFilter>,
415    ) -> Result<Connection<String, Coin>> {
416        OwnerImpl::from(self)
417            .coins(ctx, first, after, last, before, type_)
418            .await
419    }
420
421    /// The `0x3::staking_pool::StakedIota` objects owned by this object.
422    pub(crate) async fn staked_iotas(
423        &self,
424        ctx: &Context<'_>,
425        first: Option<u64>,
426        after: Option<Cursor>,
427        last: Option<u64>,
428        before: Option<Cursor>,
429    ) -> Result<Connection<String, StakedIota>> {
430        OwnerImpl::from(self)
431            .staked_iotas(ctx, first, after, last, before)
432            .await
433    }
434
435    /// The name explicitly configured as the default name pointing to this
436    /// address.
437    pub(crate) async fn iota_names_default_name(
438        &self,
439        ctx: &Context<'_>,
440        format: Option<NameFormat>,
441    ) -> Result<Option<String>> {
442        OwnerImpl::from(self)
443            .iota_names_default_name(ctx, format)
444            .await
445    }
446
447    /// The NameRegistration NFTs owned by this address. These grant the
448    /// owner the capability to manage the associated name.
449    pub(crate) async fn iota_names_registrations(
450        &self,
451        ctx: &Context<'_>,
452        first: Option<u64>,
453        after: Option<Cursor>,
454        last: Option<u64>,
455        before: Option<Cursor>,
456    ) -> Result<Connection<String, NameRegistration>> {
457        OwnerImpl::from(self)
458            .iota_names_registrations(ctx, first, after, last, before)
459            .await
460    }
461
462    pub(crate) async fn version(&self) -> UInt53 {
463        ObjectImpl(self).version().await
464    }
465
466    /// The current status of the object as read from the off-chain store. The
467    /// possible states are:
468    /// - NOT_INDEXED: The object is loaded from serialized data, such as the
469    ///   contents of a genesis or system package upgrade transaction.
470    /// - INDEXED: The object is retrieved from the off-chain index and
471    ///   represents the most recent or historical state of the object.
472    /// - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
473    ///   information can be loaded.
474    pub(crate) async fn status(&self) -> ObjectStatus {
475        ObjectImpl(self).status().await
476    }
477
478    /// 32-byte hash that identifies the object's current contents, encoded as a
479    /// Base58 string.
480    pub(crate) async fn digest(&self) -> Option<String> {
481        ObjectImpl(self).digest().await
482    }
483
484    /// The owner type of this object: Immutable, Shared, Parent, Address
485    /// Immutable and Shared Objects do not have owners.
486    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
487        ObjectImpl(self).owner(ctx).await
488    }
489
490    /// The transaction block that created this version of the object.
491    pub(crate) async fn previous_transaction_block(
492        &self,
493        ctx: &Context<'_>,
494    ) -> Result<Option<TransactionBlock>> {
495        ObjectImpl(self).previous_transaction_block(ctx).await
496    }
497
498    /// The amount of IOTA we would rebate if this object gets deleted or
499    /// mutated. This number is recalculated based on the present storage
500    /// gas price.
501    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
502        ObjectImpl(self).storage_rebate().await
503    }
504
505    /// The transaction blocks that sent objects to this object.
506    ///
507    /// `scanLimit` restricts the number of candidate transactions scanned when
508    /// gathering a page of results. It is required for queries that apply
509    /// more than two complex filters (on function, kind, sender, recipient,
510    /// input object, changed object, or ids), and can be at most
511    /// `serviceConfig.maxScanLimit`.
512    ///
513    /// When the scan limit is reached the page will be returned even if it has
514    /// fewer than `first` results when paginating forward (`last` when
515    /// paginating backwards). If there are more transactions to scan,
516    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
517    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
518    /// to the last transaction that was scanned as opposed to the last (or
519    /// first) transaction in the page.
520    ///
521    /// Requesting the next (or previous) page after this cursor will resume the
522    /// search, scanning the next `scanLimit` many transactions in the
523    /// direction of pagination, and so on until all transactions in the
524    /// scanning range have been visited.
525    ///
526    /// By default, the scanning range includes all transactions known to
527    /// GraphQL, but it can be restricted by the `after` and `before`
528    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
529    /// `atCheckpoint` filters.
530    pub(crate) async fn received_transaction_blocks(
531        &self,
532        ctx: &Context<'_>,
533        first: Option<u64>,
534        after: Option<transaction_block::Cursor>,
535        last: Option<u64>,
536        before: Option<transaction_block::Cursor>,
537        filter: Option<TransactionBlockFilter>,
538        scan_limit: Option<u64>,
539    ) -> Result<ScanConnection<String, TransactionBlock>> {
540        ObjectImpl(self)
541            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
542            .await
543    }
544
545    /// The Base64-encoded BCS serialization of the object's content.
546    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
547        ObjectImpl(self).bcs().await
548    }
549
550    /// The set of named templates defined on-chain for the type of this object,
551    /// to be handled off-chain. The server substitutes data from the object
552    /// into these templates to generate a display string per template.
553    async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
554        ObjectImpl(self).display(ctx).await
555    }
556
557    /// Access a dynamic field on an object using its name. Names are arbitrary
558    /// Move values whose type have `copy`, `drop`, and `store`, and are
559    /// specified using their type, and their BCS contents, Base64 encoded.
560    ///
561    /// Dynamic fields on wrapped objects can be accessed by using the same API
562    /// under the Owner type.
563    async fn dynamic_field(
564        &self,
565        ctx: &Context<'_>,
566        name: DynamicFieldName,
567    ) -> Result<Option<DynamicField>> {
568        OwnerImpl::from(self)
569            .dynamic_field(ctx, name, Some(self.root_version()))
570            .await
571    }
572
573    /// Access a dynamic object field on an object using its name. Names are
574    /// arbitrary Move values whose type have `copy`, `drop`, and `store`,
575    /// and are specified using their type, and their BCS contents, Base64
576    /// encoded. The value of a dynamic object field can also be accessed
577    /// off-chain directly via its address (e.g. using `Query.object`).
578    ///
579    /// Dynamic fields on wrapped objects can be accessed by using the same API
580    /// under the Owner type.
581    async fn dynamic_object_field(
582        &self,
583        ctx: &Context<'_>,
584        name: DynamicFieldName,
585    ) -> Result<Option<DynamicField>> {
586        OwnerImpl::from(self)
587            .dynamic_object_field(ctx, name, Some(self.root_version()))
588            .await
589    }
590
591    /// The dynamic fields and dynamic object fields on an object.
592    ///
593    /// Dynamic fields on wrapped objects can be accessed by using the same API
594    /// under the Owner type.
595    async fn dynamic_fields(
596        &self,
597        ctx: &Context<'_>,
598        first: Option<u64>,
599        after: Option<Cursor>,
600        last: Option<u64>,
601        before: Option<Cursor>,
602    ) -> Result<Connection<String, DynamicField>> {
603        OwnerImpl::from(self)
604            .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
605            .await
606    }
607
608    /// Attempts to convert the object into a MoveObject
609    async fn as_move_object(&self) -> Option<MoveObject> {
610        MoveObject::try_from(self).ok()
611    }
612
613    /// Attempts to convert the object into a MovePackage
614    async fn as_move_package(&self) -> Option<MovePackage> {
615        MovePackage::try_from(self).ok()
616    }
617}
618
619impl ObjectImpl<'_> {
620    pub(crate) async fn version(&self) -> UInt53 {
621        self.0.version_impl().into()
622    }
623
624    pub(crate) async fn status(&self) -> ObjectStatus {
625        ObjectStatus::from(&self.0.kind)
626    }
627
628    pub(crate) async fn digest(&self) -> Option<String> {
629        self.0
630            .native_impl()
631            .map(|native| native.digest().base58_encode())
632    }
633
634    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
635        use NativeOwner as O;
636
637        let native = self.0.native_impl()?;
638
639        match native.owner {
640            O::AddressOwner(address) => {
641                let address = IotaAddress::from(address);
642                Some(ObjectOwner::Address(AddressOwner {
643                    owner: Some(Owner {
644                        address,
645                        checkpoint_viewed_at: self.0.checkpoint_viewed_at,
646                        root_version: None,
647                    }),
648                }))
649            }
650            O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
651            O::ObjectOwner(address) => {
652                let parent = Object::query(
653                    ctx,
654                    address.into(),
655                    Object::latest_at(self.0.checkpoint_viewed_at),
656                )
657                .await
658                .ok()
659                .flatten();
660
661                Some(ObjectOwner::Parent(Box::new(Parent { parent })))
662            }
663            O::Shared {
664                initial_shared_version,
665            } => Some(ObjectOwner::Shared(Shared {
666                initial_shared_version: initial_shared_version.value().into(),
667            })),
668        }
669    }
670
671    pub(crate) async fn previous_transaction_block(
672        &self,
673        ctx: &Context<'_>,
674    ) -> Result<Option<TransactionBlock>> {
675        let Some(native) = self.0.native_impl() else {
676            return Ok(None);
677        };
678        let digest = native.previous_transaction;
679
680        TransactionBlock::query(ctx, digest.into(), self.0.checkpoint_viewed_at)
681            .await
682            .extend()
683    }
684
685    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
686        self.0
687            .native_impl()
688            .map(|native| BigInt::from(native.storage_rebate))
689    }
690
691    pub(crate) async fn received_transaction_blocks(
692        &self,
693        ctx: &Context<'_>,
694        first: Option<u64>,
695        after: Option<transaction_block::Cursor>,
696        last: Option<u64>,
697        before: Option<transaction_block::Cursor>,
698        filter: Option<TransactionBlockFilter>,
699        scan_limit: Option<u64>,
700    ) -> Result<ScanConnection<String, TransactionBlock>> {
701        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
702
703        let Some(filter) = filter
704            .unwrap_or_default()
705            .intersect(TransactionBlockFilter {
706                recv_address: Some(self.0.address),
707                ..Default::default()
708            })
709        else {
710            return Ok(ScanConnection::new(false, false));
711        };
712
713        TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
714            .await
715            .extend()
716    }
717
718    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
719        use ObjectKind as K;
720        Ok(match &self.0.kind {
721            K::WrappedOrDeleted(_) => None,
722            // WrappedOrDeleted objects are also read from the historical objects table, and they do
723            // not have a serialized object, so the column is also nullable for stored historical
724            // objects.
725            K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
726
727            K::NotIndexed(native) => {
728                let bytes = bcs::to_bytes(native)
729                    .map_err(|e| {
730                        Error::Internal(format!(
731                            "Failed to serialize object at {}: {e}",
732                            self.0.address
733                        ))
734                    })
735                    .extend()?;
736                Some(Base64::from(&bytes))
737            }
738        })
739    }
740
741    /// `display` is part of the `IMoveObject` interface, but is implemented on
742    /// `ObjectImpl` to allow for a convenience function on `Object`.
743    pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
744        let Some(native) = self.0.native_impl() else {
745            return Ok(None);
746        };
747
748        let move_object = native
749            .data
750            .try_as_move()
751            .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
752            .extend()?;
753
754        let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
755            .await
756            .extend()?;
757
758        let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
759            .await
760            .extend()?
761        else {
762            return Ok(None);
763        };
764
765        Ok(Some(display.render(&move_struct).extend()?))
766    }
767}
768
769impl Object {
770    /// Construct a GraphQL object from a native object, without its stored
771    /// (indexed) counterpart.
772    ///
773    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
774    /// which this `Object` was constructed in. This is stored on `Object`
775    /// so that when viewing that entity's state, it will be as if it was
776    /// read at the same checkpoint.
777    ///
778    /// `root_version` represents the version of the root object in some nested
779    /// chain of dynamic fields. This should typically be left `None`,
780    /// unless the object(s) being resolved is a dynamic field, or if
781    /// `root_version` has been explicitly set for this object. If None, then
782    /// we use [`version_for_dynamic_fields`] to infer a root version to then
783    /// propagate from this object down to its dynamic fields.
784    pub(crate) fn from_native(
785        address: IotaAddress,
786        native: NativeObject,
787        checkpoint_viewed_at: u64,
788        root_version: Option<u64>,
789    ) -> Object {
790        let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
791        Object {
792            address,
793            kind: ObjectKind::NotIndexed(native),
794            checkpoint_viewed_at,
795            root_version,
796        }
797    }
798
799    pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
800        use ObjectKind as K;
801
802        match &self.kind {
803            K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
804            K::WrappedOrDeleted(_) => None,
805        }
806    }
807
808    pub(crate) fn version_impl(&self) -> u64 {
809        use ObjectKind as K;
810
811        match &self.kind {
812            K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
813            K::WrappedOrDeleted(object_version) => *object_version,
814        }
815    }
816
817    /// Root parent object version for dynamic fields.
818    ///
819    /// Check [`Object::root_version`] for details.
820    pub(crate) fn root_version(&self) -> u64 {
821        self.root_version
822    }
823
824    /// Query the database for a `page` of objects, optionally `filter`-ed.
825    ///
826    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
827    /// which this page was queried for. Each entity returned in the
828    /// connection will inherit this checkpoint, so that when viewing that
829    /// entity's state, it will be as if it was read at the same checkpoint.
830    pub(crate) async fn paginate(
831        db: &Db,
832        page: Page<Cursor>,
833        filter: ObjectFilter,
834        checkpoint_viewed_at: u64,
835    ) -> Result<Connection<String, Object>, Error> {
836        Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
837    }
838
839    /// Query the database for a `page` of some sub-type of Object. The page
840    /// uses the bytes of an Object ID and the checkpoint when the query was
841    /// made as the cursor, and can optionally be further `filter`-ed. The
842    /// subtype is created using the `downcast` function, which is allowed
843    /// to fail, if the downcast has failed.
844    ///
845    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
846    /// which this page was queried for. Each entity returned in the
847    /// connection will inherit this checkpoint, so that when viewing that
848    /// entity's state, it will be as if it was read at the same checkpoint.
849    ///
850    /// If a `Page<Cursor>` is also provided, then this function will defer to
851    /// the `checkpoint_viewed_at` in the cursors. Otherwise, use the value
852    /// from the parameter, or set to None. This is so that paginated
853    /// queries are consistent with the previous query that created the
854    /// cursor.
855    pub(crate) async fn paginate_subtype<T: OutputType>(
856        db: &Db,
857        page: Page<Cursor>,
858        filter: ObjectFilter,
859        checkpoint_viewed_at: u64,
860        downcast: impl Fn(Object) -> Result<T, Error>,
861    ) -> Result<Connection<String, T>, Error> {
862        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
863        // they are consistent. Otherwise, use the value from the parameter, or
864        // set to None. This is so that paginated queries are consistent with
865        // the previous query that created the cursor.
866        let cursor_viewed_at = page.validate_cursor_consistency()?;
867        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
868
869        let Some((prev, next, results)) = db
870            .execute_repeatable(move |conn| {
871                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
872                    return Ok::<_, diesel::result::Error>(None);
873                };
874
875                Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
876                    conn,
877                    checkpoint_viewed_at,
878                    objects_query(&filter, range, &page),
879                )?))
880            })
881            .await?
882        else {
883            return Err(Error::Client(
884                "Requested data is outside the available range".to_string(),
885            ));
886        };
887
888        let mut conn: Connection<String, T> = Connection::new(prev, next);
889
890        for stored in results {
891            // To maintain consistency, the returned cursor should have the same upper-bound
892            // as the checkpoint found on the cursor.
893            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
894            let object =
895                Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
896            conn.edges.push(Edge::new(cursor, downcast(object)?));
897        }
898
899        Ok(conn)
900    }
901
902    /// Look-up the latest version of the object as of a given checkpoint.
903    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
904        ObjectLookup::LatestAt {
905            checkpoint_viewed_at,
906        }
907    }
908
909    /// Look-up the latest version of an object whose version is less than or
910    /// equal to its parent's version, as of a given checkpoint.
911    pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
912        ObjectLookup::UnderParent {
913            parent_version,
914            checkpoint_viewed_at,
915        }
916    }
917
918    /// Look-up a specific version of the object, as of a given checkpoint.
919    pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
920        ObjectLookup::VersionAt {
921            version,
922            checkpoint_viewed_at,
923        }
924    }
925
926    /// Look-up a specific version of the object from optimistic transactions.
927    pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
928        ObjectLookup::OptimisticVersion { version }
929    }
930
931    pub(crate) async fn query(
932        ctx: &Context<'_>,
933        id: IotaAddress,
934        key: ObjectLookup,
935    ) -> Result<Option<Self>, Error> {
936        let DataLoader(loader) = &ctx.data_unchecked();
937
938        match key {
939            ObjectLookup::VersionAt {
940                version,
941                checkpoint_viewed_at,
942            } => {
943                loader
944                    .load_one(HistoricalKey {
945                        id,
946                        version,
947                        checkpoint_viewed_at,
948                    })
949                    .await
950            }
951
952            ObjectLookup::OptimisticVersion { version } => {
953                loader.load_one(OptimisticKey { id, version }).await
954            }
955
956            ObjectLookup::UnderParent {
957                parent_version,
958                checkpoint_viewed_at,
959            } => {
960                loader
961                    .load_one(ParentVersionKey {
962                        id,
963                        parent_version,
964                        checkpoint_viewed_at,
965                    })
966                    .await
967            }
968
969            ObjectLookup::LatestAt {
970                checkpoint_viewed_at,
971            } => {
972                loader
973                    .load_one(LatestAtKey {
974                        id,
975                        checkpoint_viewed_at,
976                    })
977                    .await
978            }
979        }
980    }
981
982    /// Query for a singleton object identified by its type. Note: the object is
983    /// assumed to be a singleton (we either find at least one object with
984    /// this type and then return it, or return nothing).
985    pub(crate) async fn query_singleton(
986        db: &Db,
987        type_: StructTag,
988        checkpoint_viewed_at: u64,
989    ) -> Result<Option<Object>, Error> {
990        let filter = ObjectFilter {
991            type_: Some(TypeFilter::ByType(type_)),
992            ..Default::default()
993        };
994
995        let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
996
997        Ok(connection.edges.into_iter().next().map(|edge| edge.node))
998    }
999
1000    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
1001    /// which this `Object` was constructed in. This is stored on `Object`
1002    /// so that when viewing that entity's state, it will be as if it was
1003    /// read at the same checkpoint.
1004    ///
1005    /// `root_version` represents the version of the root object in some nested
1006    /// chain of dynamic fields. This should typically be left `None`,
1007    /// unless the object(s) being resolved is a dynamic field, or if
1008    /// `root_version` has been explicitly set for this object. If None, then
1009    /// we use [`version_for_dynamic_fields`] to infer a root version to then
1010    /// propagate from this object down to its dynamic fields.
1011    pub(crate) fn try_from_stored_history_object(
1012        history_object: StoredHistoryObject,
1013        checkpoint_viewed_at: u64,
1014        root_version: Option<u64>,
1015    ) -> Result<Self, Error> {
1016        let address = addr(&history_object.object_id)?;
1017
1018        let object_status =
1019            NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1020                Error::Internal(format!(
1021                    "Unknown object status {} for object {} at version {}",
1022                    history_object.object_status, address, history_object.object_version
1023                ))
1024            })?;
1025
1026        match object_status {
1027            NativeObjectStatus::Active => {
1028                let Some(serialized_object) = &history_object.serialized_object else {
1029                    return Err(Error::Internal(format!(
1030                        "Live object {} at version {} cannot have missing serialized_object field",
1031                        address, history_object.object_version
1032                    )));
1033                };
1034
1035                let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1036                    Error::Internal(format!("Failed to deserialize object {address}"))
1037                })?;
1038
1039                let root_version =
1040                    root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1041                Ok(Self {
1042                    address,
1043                    kind: ObjectKind::Indexed(native_object, history_object),
1044                    checkpoint_viewed_at,
1045                    root_version,
1046                })
1047            }
1048            NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1049                address,
1050                kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1051                checkpoint_viewed_at,
1052                root_version: history_object.object_version as u64,
1053            }),
1054        }
1055    }
1056
1057    pub(crate) fn try_from_stored_object(
1058        stored_object: StoredObject,
1059        checkpoint_viewed_at: u64,
1060    ) -> Result<Self, Error> {
1061        let address = addr(&stored_object.object_id)?;
1062
1063        let native_object = bcs::from_bytes(&stored_object.serialized_object)
1064            .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1065
1066        let root_version = version_for_dynamic_fields(&native_object);
1067
1068        let stored_history_like = StoredHistoryObject {
1069            object_id: stored_object.object_id,
1070            object_version: stored_object.object_version,
1071            object_digest: Some(stored_object.object_digest),
1072            object_status: NativeObjectStatus::Active as i16,
1073            checkpoint_sequence_number: checkpoint_viewed_at as i64,
1074            serialized_object: Some(stored_object.serialized_object),
1075            object_type: stored_object.object_type,
1076            object_type_package: stored_object.object_type_package,
1077            object_type_module: stored_object.object_type_module,
1078            object_type_name: stored_object.object_type_name,
1079            owner_type: Some(stored_object.owner_type),
1080            owner_id: stored_object.owner_id,
1081            coin_type: stored_object.coin_type,
1082            coin_balance: stored_object.coin_balance,
1083            df_kind: stored_object.df_kind,
1084        };
1085
1086        Ok(Self {
1087            address,
1088            kind: ObjectKind::Indexed(native_object, stored_history_like),
1089            checkpoint_viewed_at,
1090            root_version,
1091        })
1092    }
1093}
1094
1095/// We're deliberately choosing to use a child object's version as the root
1096/// here, and letting the caller override it with the actual root object's
1097/// version if it has access to it.
1098///
1099/// Using the child object's version as the root means that we're seeing the
1100/// dynamic field tree under this object at the state resulting from the
1101/// transaction that produced this version.
1102///
1103/// See [`Object::root_version`] for more details on parent/child object version
1104/// mechanics.
1105fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1106    native.as_inner().version().into()
1107}
1108
1109impl ObjectFilter {
1110    /// Try to create a filter whose results are the intersection of objects in
1111    /// `self`'s results and objects in `other`'s results. This may not be
1112    /// possible if the resulting filter is inconsistent in some way (e.g. a
1113    /// filter that requires one field to be two different values
1114    /// simultaneously).
1115    pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1116        macro_rules! intersect {
1117            ($field:ident, $body:expr) => {
1118                intersect::field(self.$field, other.$field, $body)
1119            };
1120        }
1121
1122        // Treat `object_ids` and `object_keys` as a single filter on IDs, and
1123        // optionally versions, and compute the intersection of that.
1124        let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1125            let mut combined = BTreeMap::new();
1126
1127            for (id, v) in k {
1128                if let Some(w) = l.get(&id).copied() {
1129                    combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1130                }
1131            }
1132
1133            // If the intersection is empty, it means, there were some ID or Key filters in
1134            // both `self` and `other`, but they don't overlap, so the final
1135            // result is inconsistent.
1136            (!combined.is_empty()).then_some(combined)
1137        })?;
1138
1139        // Extract the ID and Key filters back out. At this point, we know that if there
1140        // were ID/Key filters in both `self` and `other`, then they intersected
1141        // to form a consistent set of constraints, so it is safe to interpret
1142        // the lack of any ID/Key filters respectively as a lack of that kind of
1143        // constraint, rather than a constraint on the empty set.
1144
1145        let object_ids = {
1146            let partition: Vec<_> = keys
1147                .iter()
1148                .flatten()
1149                .filter_map(|(id, v)| v.is_none().then_some(*id))
1150                .collect();
1151
1152            (!partition.is_empty()).then_some(partition)
1153        };
1154
1155        let object_keys = {
1156            let partition: Vec<_> = keys
1157                .iter()
1158                .flatten()
1159                .filter_map(|(id, v)| {
1160                    Some(ObjectKey {
1161                        object_id: *id,
1162                        version: (*v)?.into(),
1163                    })
1164                })
1165                .collect();
1166
1167            (!partition.is_empty()).then_some(partition)
1168        };
1169
1170        Some(Self {
1171            type_: intersect!(type_, TypeFilter::intersect)?,
1172            owner: intersect!(owner, intersect::by_eq)?,
1173            object_ids,
1174            object_keys,
1175        })
1176    }
1177
1178    /// Extract the Object ID and Key filters into one combined map from Object
1179    /// IDs in this filter, to the versions they should have (or None if the
1180    /// filter mentions the ID but no version for it).
1181    fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1182        if self.object_keys.is_none() && self.object_ids.is_none() {
1183            return None;
1184        }
1185
1186        Some(BTreeMap::from_iter(
1187            self.object_keys
1188                .iter()
1189                .flatten()
1190                .map(|key| (key.object_id, Some(key.version.into())))
1191                // Chain ID filters after Key filters so if there is overlap, we overwrite the key
1192                // filter with the ID filter.
1193                .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1194        ))
1195    }
1196
1197    /// Applies ObjectFilter to the input `RawQuery` and returns a new
1198    /// `RawQuery`.
1199    pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1200        // Start by applying the filters on IDs and/or keys because they are combined as
1201        // a disjunction, while the remaining queries are conjunctions.
1202        if let Some(object_ids) = &self.object_ids {
1203            // Maximally strict - match a vec of 0 elements
1204            if object_ids.is_empty() {
1205                query = or_filter!(query, "1=0");
1206            } else {
1207                let mut inner = String::new();
1208                let mut prefix = "object_id IN (";
1209                for id in object_ids {
1210                    // SAFETY: Writing to a `String` cannot fail.
1211                    write!(
1212                        &mut inner,
1213                        "{prefix}'\\x{}'::bytea",
1214                        hex::encode(id.into_vec())
1215                    )
1216                    .unwrap();
1217                    prefix = ", ";
1218                }
1219                inner.push(')');
1220                query = or_filter!(query, inner);
1221            }
1222        }
1223
1224        if let Some(object_keys) = &self.object_keys {
1225            // Maximally strict - match a vec of 0 elements
1226            if object_keys.is_empty() {
1227                query = or_filter!(query, "1=0");
1228            } else {
1229                let mut inner = String::new();
1230                let mut prefix = "(";
1231                for ObjectKey { object_id, version } in object_keys {
1232                    // SAFETY: Writing to a `String` cannot fail.
1233                    write!(
1234                        &mut inner,
1235                        "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1236                        hex::encode(object_id.into_vec()),
1237                        version
1238                    )
1239                    .unwrap();
1240                    prefix = " OR ";
1241                }
1242                inner.push(')');
1243                query = or_filter!(query, inner);
1244            }
1245        }
1246
1247        if let Some(owner) = self.owner {
1248            query = filter!(
1249                query,
1250                format!(
1251                    "owner_id = '\\x{}'::bytea AND owner_type = {}",
1252                    hex::encode(owner.into_vec()),
1253                    OwnerType::Address as i16
1254                )
1255            );
1256        }
1257
1258        if let Some(type_) = &self.type_ {
1259            return type_.apply_raw(
1260                query,
1261                "object_type",
1262                "object_type_package",
1263                "object_type_module",
1264                "object_type_name",
1265            );
1266        }
1267
1268        query
1269    }
1270
1271    pub(crate) fn has_filters(&self) -> bool {
1272        self != &Default::default()
1273    }
1274}
1275
1276impl HistoricalObjectCursor {
1277    pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1278        Self {
1279            object_id,
1280            checkpoint_viewed_at,
1281        }
1282    }
1283}
1284
1285impl Checkpointed for Cursor {
1286    fn checkpoint_viewed_at(&self) -> u64 {
1287        self.checkpoint_viewed_at
1288    }
1289}
1290
1291impl ScanLimited for Cursor {}
1292
1293impl RawPaginated<Cursor> for StoredHistoryObject {
1294    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1295        filter!(
1296            query,
1297            format!(
1298                "candidates.object_id >= '\\x{}'::bytea",
1299                hex::encode(cursor.object_id.clone())
1300            )
1301        )
1302    }
1303
1304    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1305        filter!(
1306            query,
1307            format!(
1308                "candidates.object_id <= '\\x{}'::bytea",
1309                hex::encode(cursor.object_id.clone())
1310            )
1311        )
1312    }
1313
1314    fn order(asc: bool, query: RawQuery) -> RawQuery {
1315        if asc {
1316            query.order_by("candidates.object_id ASC")
1317        } else {
1318            query.order_by("candidates.object_id DESC")
1319        }
1320    }
1321}
1322
1323impl Target<Cursor> for StoredHistoryObject {
1324    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1325        Cursor::new(HistoricalObjectCursor::new(
1326            self.object_id.clone(),
1327            checkpoint_viewed_at,
1328        ))
1329    }
1330}
1331
1332impl Loader<HistoricalKey> for Db {
1333    type Value = Object;
1334    type Error = Error;
1335
1336    async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1337        use objects_history::dsl as h;
1338        use objects_version::dsl as v;
1339
1340        if keys.is_empty() {
1341            return Ok(HashMap::new());
1342        }
1343
1344        let id_versions: BTreeSet<_> = keys
1345            .iter()
1346            .map(|key| (key.id.into_vec(), key.version as i64))
1347            .collect();
1348
1349        let objects: Vec<StoredHistoryObject> = self
1350            .execute(move |conn| {
1351                conn.results(move || {
1352                    let mut query = h::objects_history
1353                        .inner_join(
1354                            v::objects_version.on(v::cp_sequence_number
1355                                .eq(h::checkpoint_sequence_number)
1356                                .and(v::object_id.eq(h::object_id))
1357                                .and(v::object_version.eq(h::object_version))),
1358                        )
1359                        .select(StoredHistoryObject::as_select())
1360                        .into_boxed();
1361
1362                    for (id, version) in id_versions.iter().cloned() {
1363                        query =
1364                            query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1365                    }
1366
1367                    query
1368                })
1369            })
1370            .await
1371            .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1372
1373        let mut id_version_to_stored = BTreeMap::new();
1374        for stored in objects {
1375            let key = (addr(&stored.object_id)?, stored.object_version as u64);
1376            id_version_to_stored.insert(key, stored);
1377        }
1378
1379        let mut result = HashMap::new();
1380        for key in keys {
1381            let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1382                continue;
1383            };
1384
1385            // Filter by key's checkpoint viewed at here. Doing this in memory because it
1386            // should be quite rare that this query actually filters something,
1387            // but encoding it in SQL is complicated.
1388            if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1389                continue;
1390            }
1391
1392            let object = Object::try_from_stored_history_object(
1393                stored.clone(),
1394                key.checkpoint_viewed_at,
1395                // This conversion will use the object's own version as the `Object::root_version`.
1396                None,
1397            )?;
1398            result.insert(*key, object);
1399        }
1400
1401        Ok(result)
1402    }
1403}
1404
1405impl Loader<OptimisticKey> for Db {
1406    type Value = Object;
1407    type Error = Error;
1408
1409    async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1410        use objects::dsl as o;
1411
1412        if keys.is_empty() {
1413            return Ok(HashMap::new());
1414        }
1415
1416        let id_versions: BTreeSet<_> = keys
1417            .iter()
1418            .map(|key| (key.id.into_vec(), key.version as i64))
1419            .collect();
1420
1421        let objects: Vec<StoredObject> = self
1422            .execute(move |conn| {
1423                conn.results(move || {
1424                    let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1425                    for (id, version) in id_versions.iter().cloned() {
1426                        query =
1427                            query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1428                    }
1429                    query
1430                })
1431            })
1432            .await
1433            .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1434
1435        let mut result = HashMap::new();
1436        let id_version_to_stored = objects
1437            .into_iter()
1438            .map(|stored| {
1439                addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1440            })
1441            .collect::<Result<BTreeMap<_, _>, _>>()?;
1442
1443        // Collect keys that were not found in objects table
1444        let mut missing_keys = Vec::new();
1445        for key in keys {
1446            if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1447                let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1448                result.insert(*key, object);
1449            } else {
1450                missing_keys.push(*key);
1451            }
1452        }
1453
1454        // For missing keys, fallback to using the objects_history table
1455        if !missing_keys.is_empty() {
1456            let historical_keys: Vec<HistoricalKey> = missing_keys
1457                .iter()
1458                .map(|key| HistoricalKey {
1459                    id: key.id,
1460                    version: key.version,
1461                    checkpoint_viewed_at: u64::MAX,
1462                })
1463                .collect();
1464
1465            let historical_result: HashMap<HistoricalKey, Object> =
1466                self.load(&historical_keys).await?;
1467
1468            for (historical_key, object) in historical_result {
1469                let optimistic_key = OptimisticKey {
1470                    id: historical_key.id,
1471                    version: historical_key.version,
1472                };
1473                result.insert(optimistic_key, object);
1474            }
1475        }
1476
1477        Ok(result)
1478    }
1479}
1480
1481impl Loader<ParentVersionKey> for Db {
1482    type Value = Object;
1483    type Error = Error;
1484
1485    async fn load(
1486        &self,
1487        keys: &[ParentVersionKey],
1488    ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1489        // Group keys by checkpoint viewed at and parent version -- we'll issue a
1490        // separate query for each group.
1491        #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1492        struct GroupKey {
1493            checkpoint_viewed_at: u64,
1494            parent_version: u64,
1495        }
1496
1497        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1498        for key in keys {
1499            let group_key = GroupKey {
1500                checkpoint_viewed_at: key.checkpoint_viewed_at,
1501                parent_version: key.parent_version,
1502            };
1503
1504            keys_by_cursor_and_parent_version
1505                .entry(group_key)
1506                .or_default()
1507                .insert(key.id.into_vec());
1508        }
1509
1510        // Issue concurrent reads for each group of keys.
1511        let futures = keys_by_cursor_and_parent_version
1512            .into_iter()
1513            .map(|(group_key, ids)| {
1514                self.execute(move |conn| {
1515                    let stored: Vec<StoredHistoryObject> = conn.results(move || {
1516                        use objects_history::dsl as h;
1517                        use objects_version::dsl as v;
1518
1519                        h::objects_history
1520                            .inner_join(
1521                                v::objects_version.on(v::cp_sequence_number
1522                                    .eq(h::checkpoint_sequence_number)
1523                                    .and(v::object_id.eq(h::object_id))
1524                                    .and(v::object_version.eq(h::object_version))),
1525                            )
1526                            .select(StoredHistoryObject::as_select())
1527                            .filter(v::object_id.eq_any(ids.iter().cloned()))
1528                            .filter(v::object_version.le(group_key.parent_version as i64))
1529                            .distinct_on(v::object_id)
1530                            .order_by(v::object_id)
1531                            .then_order_by(v::object_version.desc())
1532                            .into_boxed()
1533                    })?;
1534
1535                    Ok::<_, diesel::result::Error>(
1536                        stored
1537                            .into_iter()
1538                            .map(|stored| (group_key, stored))
1539                            .collect::<Vec<_>>(),
1540                    )
1541                })
1542            });
1543
1544        // Wait for the reads to all finish, and gather them into the result map.
1545        let groups = futures::future::join_all(futures).await;
1546
1547        let mut results = HashMap::new();
1548        for group in groups {
1549            for (group_key, stored) in
1550                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1551            {
1552                // This particular object is invalid -- it didn't exist at the checkpoint we are
1553                // viewing at.
1554                if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1555                    continue;
1556                }
1557
1558                let object = Object::try_from_stored_history_object(
1559                    stored,
1560                    group_key.checkpoint_viewed_at,
1561                    // If `LatestAtKey::parent_version` is set, it must have been correctly
1562                    // propagated from the `Object::root_version` of some object.
1563                    Some(group_key.parent_version),
1564                )?;
1565
1566                let key = ParentVersionKey {
1567                    id: object.address,
1568                    checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1569                    parent_version: group_key.parent_version,
1570                };
1571
1572                results.insert(key, object);
1573            }
1574        }
1575
1576        Ok(results)
1577    }
1578}
1579
1580impl Loader<LatestAtKey> for Db {
1581    type Value = Object;
1582    type Error = Error;
1583
1584    async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1585        // Group keys by checkpoint viewed at -- we'll issue a separate query for each
1586        // group.
1587        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1588
1589        for key in keys {
1590            keys_by_cursor_and_parent_version
1591                .entry(key.checkpoint_viewed_at)
1592                .or_default()
1593                .insert(key.id);
1594        }
1595
1596        // Issue concurrent reads for each group of keys.
1597        let futures =
1598            keys_by_cursor_and_parent_version
1599                .into_iter()
1600                .map(|(checkpoint_viewed_at, ids)| {
1601                    self.execute_repeatable(move |conn| {
1602                        let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1603                        else {
1604                            return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1605                                vec![],
1606                            );
1607                        };
1608
1609                        let filter = ObjectFilter {
1610                            object_ids: Some(ids.iter().cloned().collect()),
1611                            ..Default::default()
1612                        };
1613
1614                        Ok(conn
1615                            .results(move || {
1616                                build_objects_query(
1617                                    View::Consistent,
1618                                    range,
1619                                    &Page::bounded(ids.len() as u64),
1620                                    |q| filter.apply(q),
1621                                    |q| q,
1622                                )
1623                                .into_boxed()
1624                            })?
1625                            .into_iter()
1626                            .map(|r| (checkpoint_viewed_at, r))
1627                            .collect())
1628                    })
1629                });
1630
1631        // Wait for the reads to all finish, and gather them into the result map.
1632        let groups = futures::future::join_all(futures).await;
1633
1634        let mut results = HashMap::new();
1635        for group in groups {
1636            for (checkpoint_viewed_at, stored) in
1637                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1638            {
1639                let object =
1640                    Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1641
1642                let key = LatestAtKey {
1643                    id: object.address,
1644                    checkpoint_viewed_at,
1645                };
1646
1647                results.insert(key, object);
1648            }
1649        }
1650
1651        Ok(results)
1652    }
1653}
1654
1655impl From<&ObjectKind> for ObjectStatus {
1656    fn from(kind: &ObjectKind) -> Self {
1657        match kind {
1658            ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1659            ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1660            ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1661        }
1662    }
1663}
1664
1665impl From<&Object> for OwnerImpl {
1666    fn from(object: &Object) -> Self {
1667        OwnerImpl {
1668            address: object.address,
1669            checkpoint_viewed_at: object.checkpoint_viewed_at,
1670        }
1671    }
1672}
1673
1674pub(crate) async fn deserialize_move_struct(
1675    move_object: &NativeMoveObject,
1676    resolver: &PackageResolver,
1677) -> Result<(StructTag, MoveStruct), Error> {
1678    let struct_tag = StructTag::from(move_object.type_().clone());
1679    let contents = move_object.contents();
1680    let move_type_layout = resolver
1681        .type_layout(TypeTag::from(struct_tag.clone()))
1682        .await
1683        .map_err(|e| {
1684            Error::Internal(format!(
1685                "Error fetching layout for type {}: {e}",
1686                struct_tag.to_canonical_string(/* with_prefix */ true)
1687            ))
1688        })?;
1689
1690    let MoveTypeLayout::Struct(layout) = move_type_layout else {
1691        return Err(Error::Internal("Object is not a move struct".to_string()));
1692    };
1693
1694    // TODO (annotated-visitor): Use custom visitors for extracting a dynamic field,
1695    // and for creating a GraphQL MoveValue directly (not via an annotated
1696    // visitor).
1697    let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1698        Error::Internal(format!(
1699            "Error deserializing move struct for type {}: {e}",
1700            struct_tag.to_canonical_string(/* with_prefix */ true)
1701        ))
1702    })?;
1703
1704    Ok((struct_tag, move_struct))
1705}
1706
1707/// Constructs a raw query to fetch objects from the database. Objects are
1708/// filtered out if they satisfy the criteria but have a later version in the
1709/// same checkpoint. If object keys are provided, or no filters are specified at
1710/// all, then this final condition is not applied.
1711fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1712where
1713{
1714    if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1715        // If both object IDs and object keys are specified, then we need to query in
1716        // both historical and consistent views, and then union the results.
1717        let ids_only_filter = ObjectFilter {
1718            object_keys: None,
1719            ..filter.clone()
1720        };
1721        let (id_query, id_bindings) = build_objects_query(
1722            View::Consistent,
1723            range,
1724            page,
1725            move |query| ids_only_filter.apply(query),
1726            move |newer| newer,
1727        )
1728        .finish();
1729
1730        let keys_only_filter = ObjectFilter {
1731            object_ids: None,
1732            ..filter.clone()
1733        };
1734        let (key_query, key_bindings) = build_objects_query(
1735            View::Historical,
1736            range,
1737            page,
1738            move |query| keys_only_filter.apply(query),
1739            move |newer| newer,
1740        )
1741        .finish();
1742
1743        RawQuery::new(
1744            format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1745            id_bindings.into_iter().chain(key_bindings).collect(),
1746        )
1747        .order_by("object_id")
1748        .limit(page.limit() as i64)
1749    } else {
1750        // Only one of object IDs or object keys is specified, or neither are specified.
1751        let view = if filter.object_keys.is_some() || !filter.has_filters() {
1752            View::Historical
1753        } else {
1754            View::Consistent
1755        };
1756
1757        build_objects_query(
1758            view,
1759            range,
1760            page,
1761            move |query| filter.apply(query),
1762            move |newer| newer,
1763        )
1764    }
1765}
1766
1767#[cfg(test)]
1768mod tests {
1769    use std::str::FromStr;
1770
1771    use super::*;
1772
1773    #[test]
1774    fn test_owner_filter_intersection() {
1775        let f0 = ObjectFilter {
1776            owner: Some(IotaAddress::from_str("0x1").unwrap()),
1777            ..Default::default()
1778        };
1779
1780        let f1 = ObjectFilter {
1781            owner: Some(IotaAddress::from_str("0x2").unwrap()),
1782            ..Default::default()
1783        };
1784
1785        assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1786        assert_eq!(f0.clone().intersect(f1.clone()), None);
1787    }
1788
1789    #[test]
1790    fn test_key_filter_intersection() {
1791        let i1 = IotaAddress::from_str("0x1").unwrap();
1792        let i2 = IotaAddress::from_str("0x2").unwrap();
1793        let i3 = IotaAddress::from_str("0x3").unwrap();
1794        let i4 = IotaAddress::from_str("0x4").unwrap();
1795
1796        let f0 = ObjectFilter {
1797            object_ids: Some(vec![i1, i3]),
1798            object_keys: Some(vec![
1799                ObjectKey {
1800                    object_id: i2,
1801                    version: 1.into(),
1802                },
1803                ObjectKey {
1804                    object_id: i4,
1805                    version: 2.into(),
1806                },
1807            ]),
1808            ..Default::default()
1809        };
1810
1811        let f1 = ObjectFilter {
1812            object_ids: Some(vec![i1, i2]),
1813            object_keys: Some(vec![ObjectKey {
1814                object_id: i4,
1815                version: 2.into(),
1816            }]),
1817            ..Default::default()
1818        };
1819
1820        let f2 = ObjectFilter {
1821            object_ids: Some(vec![i1, i3]),
1822            ..Default::default()
1823        };
1824
1825        let f3 = ObjectFilter {
1826            object_keys: Some(vec![
1827                ObjectKey {
1828                    object_id: i2,
1829                    version: 2.into(),
1830                },
1831                ObjectKey {
1832                    object_id: i4,
1833                    version: 2.into(),
1834                },
1835            ]),
1836            ..Default::default()
1837        };
1838
1839        assert_eq!(
1840            f0.clone().intersect(f1.clone()),
1841            Some(ObjectFilter {
1842                object_ids: Some(vec![i1]),
1843                object_keys: Some(vec![
1844                    ObjectKey {
1845                        object_id: i2,
1846                        version: 1.into(),
1847                    },
1848                    ObjectKey {
1849                        object_id: i4,
1850                        version: 2.into(),
1851                    },
1852                ]),
1853                ..Default::default()
1854            })
1855        );
1856
1857        assert_eq!(
1858            f1.clone().intersect(f2.clone()),
1859            Some(ObjectFilter {
1860                object_ids: Some(vec![i1]),
1861                ..Default::default()
1862            })
1863        );
1864
1865        assert_eq!(
1866            f1.clone().intersect(f3.clone()),
1867            Some(ObjectFilter {
1868                object_keys: Some(vec![
1869                    ObjectKey {
1870                        object_id: i2,
1871                        version: 2.into(),
1872                    },
1873                    ObjectKey {
1874                        object_id: i4,
1875                        version: 2.into(),
1876                    },
1877                ]),
1878                ..Default::default()
1879            })
1880        );
1881
1882        // i2 got a conflicting version assignment
1883        assert_eq!(f0.clone().intersect(f3.clone()), None);
1884
1885        // No overlap between these two.
1886        assert_eq!(f2.clone().intersect(f3.clone()), None);
1887    }
1888}