iota_graphql_rpc/types/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt::Write,
8};
9
10use async_graphql::{
11    connection::{Connection, CursorType, Edge},
12    dataloader::Loader,
13    *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17    models::objects::{StoredHistoryObject, StoredObject},
18    schema::{objects, objects_history, objects_version},
19    types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22    TypeTag,
23    object::{
24        MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25        bounded_visitor::BoundedVisitor,
26    },
27};
28use move_core_types::{
29    annotated_value::{MoveStruct, MoveTypeLayout},
30    language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    config::DEFAULT_PAGE_SIZE,
36    connection::ScanConnection,
37    consistency::{Checkpointed, View, build_objects_query},
38    data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
39    error::Error,
40    filter, or_filter,
41    raw_query::RawQuery,
42    types::{
43        available_range::AvailableRange,
44        balance::{self, Balance},
45        base64::Base64,
46        big_int::BigInt,
47        coin::Coin,
48        coin_metadata::CoinMetadata,
49        cursor::{self, Page, RawPaginated, ScanLimited, Target},
50        digest::Digest,
51        display::{Display, DisplayEntry},
52        dynamic_field::{DynamicField, DynamicFieldName},
53        intersect,
54        iota_address::{IotaAddress, addr},
55        iota_names_registration::{NameFormat, NameRegistration},
56        move_object::MoveObject,
57        move_package::MovePackage,
58        owner::{Owner, OwnerImpl},
59        stake::StakedIota,
60        transaction_block,
61        transaction_block::{TransactionBlock, TransactionBlockFilter},
62        type_filter::{ExactTypeFilter, TypeFilter},
63        uint53::UInt53,
64    },
65};
66
67#[derive(Clone, Debug)]
68pub(crate) struct Object {
69    pub address: IotaAddress,
70    pub kind: ObjectKind,
71    /// The checkpoint sequence number at which this was viewed at.
72    pub checkpoint_viewed_at: u64,
73    /// Root parent object version for dynamic fields.
74    ///
75    /// This enables consistent dynamic field reads in the case of chained
76    /// dynamic object fields, e.g., `Parent -> DOF1 -> DOF2`. In such
77    /// cases, the object versions may end up like `Parent >= DOF1, DOF2`
78    /// but `DOF1 < DOF2`. Thus, database queries for dynamic fields must
79    /// bound the object versions by the version of the root object of the tree.
80    ///
81    /// Essentially, lamport timestamps of objects are updated for all top-level
82    /// mutable objects provided as inputs to a transaction as well as any
83    /// mutated dynamic child objects. However, any dynamic child objects
84    /// that were loaded but not actually mutated don't end up having
85    /// their versions updated.
86    root_version: u64,
87}
88
89/// Type to implement GraphQL fields that are shared by all Objects.
90pub(crate) struct ObjectImpl<'o>(pub &'o Object);
91
92#[derive(Clone, Debug)]
93#[expect(clippy::large_enum_variant)]
94pub(crate) enum ObjectKind {
95    /// An object loaded from serialized data, such as the contents of a
96    /// transaction that hasn't been indexed yet.
97    NotIndexed(NativeObject),
98    /// An object fetched from the index.
99    Indexed(NativeObject, StoredHistoryObject),
100    /// The object is wrapped or deleted and only partial information can be
101    /// loaded from the indexer. The `u64` is the version of the object.
102    WrappedOrDeleted(u64),
103}
104
105#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
106#[graphql(name = "ObjectKind")]
107pub enum ObjectStatus {
108    /// The object is loaded from serialized data, such as the contents of a
109    /// transaction that hasn't been indexed yet.
110    NotIndexed,
111    /// The object is fetched from the index.
112    Indexed,
113    /// The object is deleted or wrapped and only partial information can be
114    /// loaded from the indexer.
115    WrappedOrDeleted,
116}
117
118#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
119pub(crate) struct ObjectRef {
120    /// ID of the object.
121    pub address: IotaAddress,
122    /// Version or sequence number of the object.
123    pub version: UInt53,
124    /// Digest of the object.
125    pub digest: Digest,
126}
127
128/// Constrains the set of objects returned. All filters are optional, and the
129/// resulting set of objects are ones whose
130///
131/// - Type matches the `type` filter,
132/// - AND, whose owner matches the `owner` filter,
133/// - AND, whose ID is in `objectIds` OR whose ID and version is in
134///   `objectKeys`.
135#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
136pub(crate) struct ObjectFilter {
137    /// Filter objects by their type's `package`, `package::module`, or their
138    /// fully qualified type name.
139    ///
140    /// Generic types can be queried by either the generic type name, e.g.
141    /// `0x2::coin::Coin`, or by the full type name, such as
142    /// `0x2::coin::Coin<0x2::iota::IOTA>`.
143    pub type_: Option<TypeFilter>,
144
145    /// Filter for live objects by their current owners.
146    pub owner: Option<IotaAddress>,
147
148    /// Filter for live objects by their IDs.
149    pub object_ids: Option<Vec<IotaAddress>>,
150
151    /// Filter for live or potentially historical objects by their ID and
152    /// version.
153    pub object_keys: Option<Vec<ObjectKey>>,
154}
155
156#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
157pub(crate) struct ObjectKey {
158    pub object_id: IotaAddress,
159    pub version: UInt53,
160}
161
162/// The object's owner type: Immutable, Shared, Parent, or Address.
163#[derive(Union, Clone)]
164pub(crate) enum ObjectOwner {
165    Immutable(Immutable),
166    Shared(Shared),
167    Parent(Box<Parent>),
168    Address(AddressOwner),
169}
170
171/// An immutable object is an object that can't be mutated, transferred, or
172/// deleted. Immutable objects have no owner, so anyone can use them.
173#[derive(SimpleObject, Clone)]
174pub(crate) struct Immutable {
175    #[graphql(name = "_")]
176    dummy: Option<bool>,
177}
178
179/// A shared object is an object that is shared using the
180/// 0x2::transfer::share_object function. Unlike owned objects, once an object
181/// is shared, it stays mutable and is accessible by anyone.
182#[derive(SimpleObject, Clone)]
183pub(crate) struct Shared {
184    initial_shared_version: UInt53,
185}
186
187/// If the object's owner is a Parent, this object is part of a dynamic field
188/// (it is the value of the dynamic field, or the intermediate Field object
189/// itself). Also note that if the owner is a parent, then it's guaranteed to be
190/// an object.
191#[derive(SimpleObject, Clone)]
192pub(crate) struct Parent {
193    parent: Option<Object>,
194}
195
196/// An address-owned object is owned by a specific 32-byte address that is
197/// either an account address (derived from a particular signature scheme) or
198/// an object ID. An address-owned object is accessible only to its owner and no
199/// others.
200#[derive(SimpleObject, Clone)]
201pub(crate) struct AddressOwner {
202    owner: Option<Owner>,
203}
204
205/// Filter for a point query of an Object.
206pub(crate) enum ObjectLookup {
207    LatestAt {
208        /// The checkpoint sequence number at which this was viewed at.
209        checkpoint_viewed_at: u64,
210    },
211
212    UnderParent {
213        /// The parent version to be used as an upper bound for the query. Look
214        /// for the latest version of a child object whose version is
215        /// less than or equal to this upper bound.
216        parent_version: u64,
217        /// The checkpoint sequence number at which this was viewed at.
218        checkpoint_viewed_at: u64,
219    },
220
221    VersionAt {
222        /// The exact version of the object to be fetched.
223        version: u64,
224        /// The checkpoint sequence number at which this was viewed at.
225        checkpoint_viewed_at: u64,
226    },
227
228    /// Variant analogous to [`VersionAt`](Self::VersionAt) but for optimistic
229    /// transactions, using the most recent not checkpointed data,
230    /// not bound by any checkpoint sequence number.
231    OptimisticVersion {
232        /// The exact version of the object to be fetched.
233        version: u64,
234    },
235}
236
237pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
238
239/// The inner struct for the `Object`'s cursor. The `object_id` is used as the
240/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
241/// the cursor.
242#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
243pub(crate) struct HistoricalObjectCursor {
244    #[serde(rename = "o")]
245    object_id: Vec<u8>,
246    /// The checkpoint sequence number this was viewed at.
247    #[serde(rename = "c")]
248    checkpoint_viewed_at: u64,
249}
250
251/// Interface implemented by on-chain values that are addressable by an ID (also
252/// referred to as its address). This includes Move objects and packages.
253#[expect(clippy::duplicated_attributes)]
254#[derive(Interface)]
255#[graphql(
256    name = "IObject",
257    field(name = "version", ty = "UInt53"),
258    field(
259        name = "status",
260        ty = "ObjectStatus",
261        desc = r#"
262            The current status of the object as read from the off-chain store. The
263            possible states are:
264            - NOT_INDEXED: The object is loaded from serialized data, such as the
265            contents of a genesis or system package upgrade transaction.
266            - INDEXED: The object is retrieved from the off-chain index and
267            represents the most recent or historical state of the object.
268            - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
269            information can be loaded.
270        "#
271    ),
272    field(
273        name = "digest",
274        ty = "Option<String>",
275        desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
276                string."
277    ),
278    field(
279        name = "owner",
280        ty = "Option<ObjectOwner>",
281        desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
282                Immutable and Shared Objects do not have owners."
283    ),
284    field(
285        name = "previous_transaction_block",
286        ty = "Option<TransactionBlock>",
287        desc = "The transaction block that created this version of the object."
288    ),
289    field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
290    field(
291        name = "received_transaction_blocks",
292        arg(name = "first", ty = "Option<u64>"),
293        arg(name = "after", ty = "Option<transaction_block::Cursor>"),
294        arg(name = "last", ty = "Option<u64>"),
295        arg(name = "before", ty = "Option<transaction_block::Cursor>"),
296        arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
297        arg(name = "scan_limit", ty = "Option<u64>"),
298        ty = "ScanConnection<String, TransactionBlock>",
299        desc = "The transaction blocks that sent objects to this object."
300    ),
301    field(
302        name = "bcs",
303        ty = "Option<Base64>",
304        desc = "The Base64-encoded BCS serialization of the object's content."
305    )
306)]
307pub(crate) enum IObject {
308    Object(Object),
309    MovePackage(MovePackage),
310    MoveObject(MoveObject),
311    Coin(Coin),
312    CoinMetadata(CoinMetadata),
313    StakedIota(StakedIota),
314}
315
316/// `DataLoader` key for fetching an `Object` at a specific version, constrained
317/// by a consistency cursor (if that version was created after the checkpoint
318/// the query is viewing at, then it will fail).
319#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
320struct HistoricalKey {
321    id: IotaAddress,
322    version: u64,
323    checkpoint_viewed_at: u64,
324}
325
326/// `DataLoader` key for fetching objects that haven't been checkpointed yet.
327/// This is used specifically for loading objects
328/// that are part of optimistic transaction effects.
329#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
330struct OptimisticKey {
331    id: IotaAddress,
332    version: u64,
333}
334
335/// `DataLoader` key for fetching the latest version of an object whose parent
336/// object has version `parent_version`, as of `checkpoint_viewed_at`. This
337/// look-up can fail to find a valid object if the key is not self-consistent,
338/// for example if the `parent_version` is set to a higher version
339/// than the object's actual parent as of `checkpoint_viewed_at`.
340#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
341struct ParentVersionKey {
342    id: IotaAddress,
343    parent_version: u64,
344    checkpoint_viewed_at: u64,
345}
346
347/// `DataLoader` key for fetching the latest version of an object as of a given
348/// checkpoint.
349#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
350struct LatestAtKey {
351    id: IotaAddress,
352    checkpoint_viewed_at: u64,
353}
354
355/// An object in IOTA is a package (set of Move bytecode modules) or object
356/// (typed data structure with fields) with additional metadata detailing its
357/// id, version, transaction digest, owner field indicating how this object can
358/// be accessed.
359#[Object]
360impl Object {
361    pub(crate) async fn address(&self) -> IotaAddress {
362        OwnerImpl::from(self).address().await
363    }
364
365    /// Objects owned by this object, optionally `filter`-ed.
366    pub(crate) async fn objects(
367        &self,
368        ctx: &Context<'_>,
369        first: Option<u64>,
370        after: Option<Cursor>,
371        last: Option<u64>,
372        before: Option<Cursor>,
373        filter: Option<ObjectFilter>,
374    ) -> Result<Connection<String, MoveObject>> {
375        OwnerImpl::from(self)
376            .objects(ctx, first, after, last, before, filter)
377            .await
378    }
379
380    /// Total balance of all coins with marker type owned by this object. If
381    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
382    pub(crate) async fn balance(
383        &self,
384        ctx: &Context<'_>,
385        type_: Option<ExactTypeFilter>,
386    ) -> Result<Option<Balance>> {
387        OwnerImpl::from(self).balance(ctx, type_).await
388    }
389
390    /// The balances of all coin types owned by this object.
391    pub(crate) async fn balances(
392        &self,
393        ctx: &Context<'_>,
394        first: Option<u64>,
395        after: Option<balance::Cursor>,
396        last: Option<u64>,
397        before: Option<balance::Cursor>,
398    ) -> Result<Connection<String, Balance>> {
399        OwnerImpl::from(self)
400            .balances(ctx, first, after, last, before)
401            .await
402    }
403
404    /// The coin objects for this object.
405    ///
406    /// `type` is a filter on the coin's type parameter, defaulting to
407    /// `0x2::iota::IOTA`.
408    pub(crate) async fn coins(
409        &self,
410        ctx: &Context<'_>,
411        first: Option<u64>,
412        after: Option<Cursor>,
413        last: Option<u64>,
414        before: Option<Cursor>,
415        type_: Option<ExactTypeFilter>,
416    ) -> Result<Connection<String, Coin>> {
417        OwnerImpl::from(self)
418            .coins(ctx, first, after, last, before, type_)
419            .await
420    }
421
422    /// The `0x3::staking_pool::StakedIota` objects owned by this object.
423    pub(crate) async fn staked_iotas(
424        &self,
425        ctx: &Context<'_>,
426        first: Option<u64>,
427        after: Option<Cursor>,
428        last: Option<u64>,
429        before: Option<Cursor>,
430    ) -> Result<Connection<String, StakedIota>> {
431        OwnerImpl::from(self)
432            .staked_iotas(ctx, first, after, last, before)
433            .await
434    }
435
436    /// The name explicitly configured as the default name pointing to this
437    /// address.
438    pub(crate) async fn iota_names_default_name(
439        &self,
440        ctx: &Context<'_>,
441        format: Option<NameFormat>,
442    ) -> Result<Option<String>> {
443        OwnerImpl::from(self)
444            .iota_names_default_name(ctx, format)
445            .await
446    }
447
448    /// The NameRegistration NFTs owned by this address. These grant the
449    /// owner the capability to manage the associated name.
450    pub(crate) async fn iota_names_registrations(
451        &self,
452        ctx: &Context<'_>,
453        first: Option<u64>,
454        after: Option<Cursor>,
455        last: Option<u64>,
456        before: Option<Cursor>,
457    ) -> Result<Connection<String, NameRegistration>> {
458        OwnerImpl::from(self)
459            .iota_names_registrations(ctx, first, after, last, before)
460            .await
461    }
462
463    pub(crate) async fn version(&self) -> UInt53 {
464        ObjectImpl(self).version().await
465    }
466
467    /// The current status of the object as read from the off-chain store. The
468    /// possible states are:
469    /// - NOT_INDEXED: The object is loaded from serialized data, such as the
470    ///   contents of a genesis or system package upgrade transaction.
471    /// - INDEXED: The object is retrieved from the off-chain index and
472    ///   represents the most recent or historical state of the object.
473    /// - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
474    ///   information can be loaded.
475    pub(crate) async fn status(&self) -> ObjectStatus {
476        ObjectImpl(self).status().await
477    }
478
479    /// 32-byte hash that identifies the object's current contents, encoded as a
480    /// Base58 string.
481    pub(crate) async fn digest(&self) -> Option<String> {
482        ObjectImpl(self).digest().await
483    }
484
485    /// The owner type of this object: Immutable, Shared, Parent, Address
486    /// Immutable and Shared Objects do not have owners.
487    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
488        ObjectImpl(self).owner(ctx).await
489    }
490
491    /// The transaction block that created this version of the object.
492    pub(crate) async fn previous_transaction_block(
493        &self,
494        ctx: &Context<'_>,
495    ) -> Result<Option<TransactionBlock>> {
496        ObjectImpl(self).previous_transaction_block(ctx).await
497    }
498
499    /// The amount of IOTA we would rebate if this object gets deleted or
500    /// mutated. This number is recalculated based on the present storage
501    /// gas price.
502    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
503        ObjectImpl(self).storage_rebate().await
504    }
505
506    /// The transaction blocks that sent objects to this object.
507    ///
508    /// `scanLimit` restricts the number of candidate transactions scanned when
509    /// gathering a page of results. It is required for queries that apply
510    /// more than two complex filters (on function, kind, sender, recipient,
511    /// input object, changed object, or ids), and can be at most
512    /// `serviceConfig.maxScanLimit`.
513    ///
514    /// When the scan limit is reached the page will be returned even if it has
515    /// fewer than `first` results when paginating forward (`last` when
516    /// paginating backwards). If there are more transactions to scan,
517    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
518    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
519    /// to the last transaction that was scanned as opposed to the last (or
520    /// first) transaction in the page.
521    ///
522    /// Requesting the next (or previous) page after this cursor will resume the
523    /// search, scanning the next `scanLimit` many transactions in the
524    /// direction of pagination, and so on until all transactions in the
525    /// scanning range have been visited.
526    ///
527    /// By default, the scanning range includes all transactions known to
528    /// GraphQL, but it can be restricted by the `after` and `before`
529    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
530    /// `atCheckpoint` filters.
531    #[graphql(
532        complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
533    )]
534    pub(crate) async fn received_transaction_blocks(
535        &self,
536        ctx: &Context<'_>,
537        first: Option<u64>,
538        after: Option<transaction_block::Cursor>,
539        last: Option<u64>,
540        before: Option<transaction_block::Cursor>,
541        filter: Option<TransactionBlockFilter>,
542        scan_limit: Option<u64>,
543    ) -> Result<ScanConnection<String, TransactionBlock>> {
544        ObjectImpl(self)
545            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
546            .await
547    }
548
549    /// The Base64-encoded BCS serialization of the object's content.
550    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
551        ObjectImpl(self).bcs().await
552    }
553
554    /// The set of named templates defined on-chain for the type of this object,
555    /// to be handled off-chain. The server substitutes data from the object
556    /// into these templates to generate a display string per template.
557    async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
558        ObjectImpl(self).display(ctx).await
559    }
560
561    /// Access a dynamic field on an object using its name. Names are arbitrary
562    /// Move values whose type have `copy`, `drop`, and `store`, and are
563    /// specified using their type, and their BCS contents, Base64 encoded.
564    ///
565    /// Dynamic fields on wrapped objects can be accessed by using the same API
566    /// under the Owner type.
567    async fn dynamic_field(
568        &self,
569        ctx: &Context<'_>,
570        name: DynamicFieldName,
571    ) -> Result<Option<DynamicField>> {
572        OwnerImpl::from(self)
573            .dynamic_field(ctx, name, Some(self.root_version()))
574            .await
575    }
576
577    /// Access a dynamic object field on an object using its name. Names are
578    /// arbitrary Move values whose type have `copy`, `drop`, and `store`,
579    /// and are specified using their type, and their BCS contents, Base64
580    /// encoded. The value of a dynamic object field can also be accessed
581    /// off-chain directly via its address (e.g. using `Query.object`).
582    ///
583    /// Dynamic fields on wrapped objects can be accessed by using the same API
584    /// under the Owner type.
585    async fn dynamic_object_field(
586        &self,
587        ctx: &Context<'_>,
588        name: DynamicFieldName,
589    ) -> Result<Option<DynamicField>> {
590        OwnerImpl::from(self)
591            .dynamic_object_field(ctx, name, Some(self.root_version()))
592            .await
593    }
594
595    /// The dynamic fields and dynamic object fields on an object.
596    ///
597    /// Dynamic fields on wrapped objects can be accessed by using the same API
598    /// under the Owner type.
599    async fn dynamic_fields(
600        &self,
601        ctx: &Context<'_>,
602        first: Option<u64>,
603        after: Option<Cursor>,
604        last: Option<u64>,
605        before: Option<Cursor>,
606    ) -> Result<Connection<String, DynamicField>> {
607        OwnerImpl::from(self)
608            .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
609            .await
610    }
611
612    /// Attempts to convert the object into a MoveObject
613    async fn as_move_object(&self) -> Option<MoveObject> {
614        MoveObject::try_from(self).ok()
615    }
616
617    /// Attempts to convert the object into a MovePackage
618    async fn as_move_package(&self) -> Option<MovePackage> {
619        MovePackage::try_from(self).ok()
620    }
621}
622
623impl ObjectImpl<'_> {
624    pub(crate) async fn version(&self) -> UInt53 {
625        self.0.version_impl().into()
626    }
627
628    pub(crate) async fn status(&self) -> ObjectStatus {
629        ObjectStatus::from(&self.0.kind)
630    }
631
632    pub(crate) async fn digest(&self) -> Option<String> {
633        self.0
634            .native_impl()
635            .map(|native| native.digest().base58_encode())
636    }
637
638    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
639        use NativeOwner as O;
640
641        let native = self.0.native_impl()?;
642
643        match native.owner {
644            O::AddressOwner(address) => {
645                let address = IotaAddress::from(address);
646                Some(ObjectOwner::Address(AddressOwner {
647                    owner: Some(Owner {
648                        address,
649                        checkpoint_viewed_at: self.0.checkpoint_viewed_at,
650                        root_version: None,
651                    }),
652                }))
653            }
654            O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
655            O::ObjectOwner(address) => {
656                let parent = Object::query(
657                    ctx,
658                    address.into(),
659                    Object::latest_at(self.0.checkpoint_viewed_at),
660                )
661                .await
662                .ok()
663                .flatten();
664
665                Some(ObjectOwner::Parent(Box::new(Parent { parent })))
666            }
667            O::Shared {
668                initial_shared_version,
669            } => Some(ObjectOwner::Shared(Shared {
670                initial_shared_version: initial_shared_version.value().into(),
671            })),
672        }
673    }
674
675    pub(crate) async fn previous_transaction_block(
676        &self,
677        ctx: &Context<'_>,
678    ) -> Result<Option<TransactionBlock>> {
679        let Some(native) = self.0.native_impl() else {
680            return Ok(None);
681        };
682        let digest = native.previous_transaction;
683        let key = transaction_block::DigestKey::new(digest.into(), self.0.checkpoint_viewed_at);
684
685        TransactionBlock::query(ctx, key.into()).await.extend()
686    }
687
688    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
689        self.0
690            .native_impl()
691            .map(|native| BigInt::from(native.storage_rebate))
692    }
693
694    pub(crate) async fn received_transaction_blocks(
695        &self,
696        ctx: &Context<'_>,
697        first: Option<u64>,
698        after: Option<transaction_block::Cursor>,
699        last: Option<u64>,
700        before: Option<transaction_block::Cursor>,
701        filter: Option<TransactionBlockFilter>,
702        scan_limit: Option<u64>,
703    ) -> Result<ScanConnection<String, TransactionBlock>> {
704        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
705
706        let Some(filter) = filter
707            .unwrap_or_default()
708            .intersect(TransactionBlockFilter {
709                recv_address: Some(self.0.address),
710                ..Default::default()
711            })
712        else {
713            return Ok(ScanConnection::new(false, false));
714        };
715
716        TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
717            .await
718            .extend()
719    }
720
721    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
722        use ObjectKind as K;
723        Ok(match &self.0.kind {
724            K::WrappedOrDeleted(_) => None,
725            // WrappedOrDeleted objects are also read from the historical objects table, and they do
726            // not have a serialized object, so the column is also nullable for stored historical
727            // objects.
728            K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
729
730            K::NotIndexed(native) => {
731                let bytes = bcs::to_bytes(native)
732                    .map_err(|e| {
733                        Error::Internal(format!(
734                            "Failed to serialize object at {}: {e}",
735                            self.0.address
736                        ))
737                    })
738                    .extend()?;
739                Some(Base64::from(&bytes))
740            }
741        })
742    }
743
744    /// `display` is part of the `IMoveObject` interface, but is implemented on
745    /// `ObjectImpl` to allow for a convenience function on `Object`.
746    pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
747        let Some(native) = self.0.native_impl() else {
748            return Ok(None);
749        };
750
751        let move_object = native
752            .data
753            .try_as_move()
754            .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
755            .extend()?;
756
757        let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
758            .await
759            .extend()?;
760
761        let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
762            .await
763            .extend()?
764        else {
765            return Ok(None);
766        };
767
768        Ok(Some(display.render(&move_struct).extend()?))
769    }
770}
771
772impl Object {
773    /// Construct a GraphQL object from a native object, without its stored
774    /// (indexed) counterpart.
775    ///
776    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
777    /// which this `Object` was constructed in. This is stored on `Object`
778    /// so that when viewing that entity's state, it will be as if it was
779    /// read at the same checkpoint.
780    ///
781    /// `root_version` represents the version of the root object in some nested
782    /// chain of dynamic fields. This should typically be left `None`,
783    /// unless the object(s) being resolved is a dynamic field, or if
784    /// `root_version` has been explicitly set for this object. If None, then
785    /// we use [`version_for_dynamic_fields`] to infer a root version to then
786    /// propagate from this object down to its dynamic fields.
787    pub(crate) fn from_native(
788        address: IotaAddress,
789        native: NativeObject,
790        checkpoint_viewed_at: u64,
791        root_version: Option<u64>,
792    ) -> Object {
793        let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
794        Object {
795            address,
796            kind: ObjectKind::NotIndexed(native),
797            checkpoint_viewed_at,
798            root_version,
799        }
800    }
801
802    pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
803        use ObjectKind as K;
804
805        match &self.kind {
806            K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
807            K::WrappedOrDeleted(_) => None,
808        }
809    }
810
811    pub(crate) fn version_impl(&self) -> u64 {
812        use ObjectKind as K;
813
814        match &self.kind {
815            K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
816            K::WrappedOrDeleted(object_version) => *object_version,
817        }
818    }
819
820    /// Root parent object version for dynamic fields.
821    ///
822    /// Check [`Object::root_version`] for details.
823    pub(crate) fn root_version(&self) -> u64 {
824        self.root_version
825    }
826
827    /// Query the database for a `page` of objects, optionally `filter`-ed.
828    ///
829    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
830    /// which this page was queried for. Each entity returned in the
831    /// connection will inherit this checkpoint, so that when viewing that
832    /// entity's state, it will be as if it was read at the same checkpoint.
833    pub(crate) async fn paginate(
834        db: &Db,
835        page: Page<Cursor>,
836        filter: ObjectFilter,
837        checkpoint_viewed_at: u64,
838    ) -> Result<Connection<String, Object>, Error> {
839        Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
840    }
841
842    /// Query the database for a `page` of some sub-type of Object. The page
843    /// uses the bytes of an Object ID and the checkpoint when the query was
844    /// made as the cursor, and can optionally be further `filter`-ed. The
845    /// subtype is created using the `downcast` function, which is allowed
846    /// to fail, if the downcast has failed.
847    ///
848    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
849    /// which this page was queried for. Each entity returned in the
850    /// connection will inherit this checkpoint, so that when viewing that
851    /// entity's state, it will be as if it was read at the same checkpoint.
852    ///
853    /// If a `Page<Cursor>` is also provided, then this function will defer to
854    /// the `checkpoint_viewed_at` in the cursors. Otherwise, use the value
855    /// from the parameter, or set to None. This is so that paginated
856    /// queries are consistent with the previous query that created the
857    /// cursor.
858    pub(crate) async fn paginate_subtype<T: OutputType>(
859        db: &Db,
860        page: Page<Cursor>,
861        filter: ObjectFilter,
862        checkpoint_viewed_at: u64,
863        downcast: impl Fn(Object) -> Result<T, Error>,
864    ) -> Result<Connection<String, T>, Error> {
865        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
866        // they are consistent. Otherwise, use the value from the parameter, or
867        // set to None. This is so that paginated queries are consistent with
868        // the previous query that created the cursor.
869        let cursor_viewed_at = page.validate_cursor_consistency()?;
870        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
871
872        let Some((prev, next, results)) = db
873            .execute_repeatable(move |conn| {
874                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
875                    return Ok::<_, diesel::result::Error>(None);
876                };
877
878                Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
879                    conn,
880                    checkpoint_viewed_at,
881                    objects_query(&filter, range, &page),
882                )?))
883            })
884            .await?
885        else {
886            return Err(Error::Client(
887                "Requested data is outside the available range".to_string(),
888            ));
889        };
890
891        let mut conn: Connection<String, T> = Connection::new(prev, next);
892
893        for stored in results {
894            // To maintain consistency, the returned cursor should have the same upper-bound
895            // as the checkpoint found on the cursor.
896            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
897            let object =
898                Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
899            conn.edges.push(Edge::new(cursor, downcast(object)?));
900        }
901
902        Ok(conn)
903    }
904
905    /// Look-up the latest version of the object as of a given checkpoint.
906    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
907        ObjectLookup::LatestAt {
908            checkpoint_viewed_at,
909        }
910    }
911
912    /// Look-up the latest version of an object whose version is less than or
913    /// equal to its parent's version, as of a given checkpoint.
914    pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
915        ObjectLookup::UnderParent {
916            parent_version,
917            checkpoint_viewed_at,
918        }
919    }
920
921    /// Look-up a specific version of the object, as of a given checkpoint.
922    pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
923        ObjectLookup::VersionAt {
924            version,
925            checkpoint_viewed_at,
926        }
927    }
928
929    /// Look-up a specific version of the object from optimistic transactions.
930    pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
931        ObjectLookup::OptimisticVersion { version }
932    }
933
934    pub(crate) async fn query(
935        ctx: &Context<'_>,
936        id: IotaAddress,
937        key: ObjectLookup,
938    ) -> Result<Option<Self>, Error> {
939        let DataLoader(loader) = &ctx.data_unchecked();
940
941        match key {
942            ObjectLookup::VersionAt {
943                version,
944                checkpoint_viewed_at,
945            } => {
946                loader
947                    .load_one(HistoricalKey {
948                        id,
949                        version,
950                        checkpoint_viewed_at,
951                    })
952                    .await
953            }
954
955            ObjectLookup::OptimisticVersion { version } => {
956                loader.load_one(OptimisticKey { id, version }).await
957            }
958
959            ObjectLookup::UnderParent {
960                parent_version,
961                checkpoint_viewed_at,
962            } => {
963                loader
964                    .load_one(ParentVersionKey {
965                        id,
966                        parent_version,
967                        checkpoint_viewed_at,
968                    })
969                    .await
970            }
971
972            ObjectLookup::LatestAt {
973                checkpoint_viewed_at,
974            } => {
975                loader
976                    .load_one(LatestAtKey {
977                        id,
978                        checkpoint_viewed_at,
979                    })
980                    .await
981            }
982        }
983    }
984
985    /// Query for a singleton object identified by its type. Note: the object is
986    /// assumed to be a singleton (we either find at least one object with
987    /// this type and then return it, or return nothing).
988    pub(crate) async fn query_singleton(
989        db: &Db,
990        type_: StructTag,
991        checkpoint_viewed_at: u64,
992    ) -> Result<Option<Object>, Error> {
993        let filter = ObjectFilter {
994            type_: Some(TypeFilter::ByType(type_)),
995            ..Default::default()
996        };
997
998        let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
999
1000        Ok(connection.edges.into_iter().next().map(|edge| edge.node))
1001    }
1002
1003    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
1004    /// which this `Object` was constructed in. This is stored on `Object`
1005    /// so that when viewing that entity's state, it will be as if it was
1006    /// read at the same checkpoint.
1007    ///
1008    /// `root_version` represents the version of the root object in some nested
1009    /// chain of dynamic fields. This should typically be left `None`,
1010    /// unless the object(s) being resolved is a dynamic field, or if
1011    /// `root_version` has been explicitly set for this object. If None, then
1012    /// we use [`version_for_dynamic_fields`] to infer a root version to then
1013    /// propagate from this object down to its dynamic fields.
1014    pub(crate) fn try_from_stored_history_object(
1015        history_object: StoredHistoryObject,
1016        checkpoint_viewed_at: u64,
1017        root_version: Option<u64>,
1018    ) -> Result<Self, Error> {
1019        let address = addr(&history_object.object_id)?;
1020
1021        let object_status =
1022            NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1023                Error::Internal(format!(
1024                    "Unknown object status {} for object {} at version {}",
1025                    history_object.object_status, address, history_object.object_version
1026                ))
1027            })?;
1028
1029        match object_status {
1030            NativeObjectStatus::Active => {
1031                let Some(serialized_object) = &history_object.serialized_object else {
1032                    return Err(Error::Internal(format!(
1033                        "Live object {} at version {} cannot have missing serialized_object field",
1034                        address, history_object.object_version
1035                    )));
1036                };
1037
1038                let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1039                    Error::Internal(format!("Failed to deserialize object {address}"))
1040                })?;
1041
1042                let root_version =
1043                    root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1044                Ok(Self {
1045                    address,
1046                    kind: ObjectKind::Indexed(native_object, history_object),
1047                    checkpoint_viewed_at,
1048                    root_version,
1049                })
1050            }
1051            NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1052                address,
1053                kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1054                checkpoint_viewed_at,
1055                root_version: history_object.object_version as u64,
1056            }),
1057        }
1058    }
1059
1060    pub(crate) fn try_from_stored_object(
1061        stored_object: StoredObject,
1062        checkpoint_viewed_at: u64,
1063    ) -> Result<Self, Error> {
1064        let address = addr(&stored_object.object_id)?;
1065
1066        let native_object = bcs::from_bytes(&stored_object.serialized_object)
1067            .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1068
1069        let root_version = version_for_dynamic_fields(&native_object);
1070
1071        let stored_history_like = StoredHistoryObject {
1072            object_id: stored_object.object_id,
1073            object_version: stored_object.object_version,
1074            object_digest: Some(stored_object.object_digest),
1075            object_status: NativeObjectStatus::Active as i16,
1076            checkpoint_sequence_number: checkpoint_viewed_at as i64,
1077            serialized_object: Some(stored_object.serialized_object),
1078            object_type: stored_object.object_type,
1079            object_type_package: stored_object.object_type_package,
1080            object_type_module: stored_object.object_type_module,
1081            object_type_name: stored_object.object_type_name,
1082            owner_type: Some(stored_object.owner_type),
1083            owner_id: stored_object.owner_id,
1084            coin_type: stored_object.coin_type,
1085            coin_balance: stored_object.coin_balance,
1086            df_kind: stored_object.df_kind,
1087        };
1088
1089        Ok(Self {
1090            address,
1091            kind: ObjectKind::Indexed(native_object, stored_history_like),
1092            checkpoint_viewed_at,
1093            root_version,
1094        })
1095    }
1096}
1097
1098/// We're deliberately choosing to use a child object's version as the root
1099/// here, and letting the caller override it with the actual root object's
1100/// version if it has access to it.
1101///
1102/// Using the child object's version as the root means that we're seeing the
1103/// dynamic field tree under this object at the state resulting from the
1104/// transaction that produced this version.
1105///
1106/// See [`Object::root_version`] for more details on parent/child object version
1107/// mechanics.
1108fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1109    native.as_inner().version().into()
1110}
1111
1112impl ObjectFilter {
1113    /// Try to create a filter whose results are the intersection of objects in
1114    /// `self`'s results and objects in `other`'s results. This may not be
1115    /// possible if the resulting filter is inconsistent in some way (e.g. a
1116    /// filter that requires one field to be two different values
1117    /// simultaneously).
1118    pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1119        macro_rules! intersect {
1120            ($field:ident, $body:expr) => {
1121                intersect::field(self.$field, other.$field, $body)
1122            };
1123        }
1124
1125        // Treat `object_ids` and `object_keys` as a single filter on IDs, and
1126        // optionally versions, and compute the intersection of that.
1127        let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1128            let mut combined = BTreeMap::new();
1129
1130            for (id, v) in k {
1131                if let Some(w) = l.get(&id).copied() {
1132                    combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1133                }
1134            }
1135
1136            // If the intersection is empty, it means, there were some ID or Key filters in
1137            // both `self` and `other`, but they don't overlap, so the final
1138            // result is inconsistent.
1139            (!combined.is_empty()).then_some(combined)
1140        })?;
1141
1142        // Extract the ID and Key filters back out. At this point, we know that if there
1143        // were ID/Key filters in both `self` and `other`, then they intersected
1144        // to form a consistent set of constraints, so it is safe to interpret
1145        // the lack of any ID/Key filters respectively as a lack of that kind of
1146        // constraint, rather than a constraint on the empty set.
1147
1148        let object_ids = {
1149            let partition: Vec<_> = keys
1150                .iter()
1151                .flatten()
1152                .filter_map(|(id, v)| v.is_none().then_some(*id))
1153                .collect();
1154
1155            (!partition.is_empty()).then_some(partition)
1156        };
1157
1158        let object_keys = {
1159            let partition: Vec<_> = keys
1160                .iter()
1161                .flatten()
1162                .filter_map(|(id, v)| {
1163                    Some(ObjectKey {
1164                        object_id: *id,
1165                        version: (*v)?.into(),
1166                    })
1167                })
1168                .collect();
1169
1170            (!partition.is_empty()).then_some(partition)
1171        };
1172
1173        Some(Self {
1174            type_: intersect!(type_, TypeFilter::intersect)?,
1175            owner: intersect!(owner, intersect::by_eq)?,
1176            object_ids,
1177            object_keys,
1178        })
1179    }
1180
1181    /// Extract the Object ID and Key filters into one combined map from Object
1182    /// IDs in this filter, to the versions they should have (or None if the
1183    /// filter mentions the ID but no version for it).
1184    fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1185        if self.object_keys.is_none() && self.object_ids.is_none() {
1186            return None;
1187        }
1188
1189        Some(BTreeMap::from_iter(
1190            self.object_keys
1191                .iter()
1192                .flatten()
1193                .map(|key| (key.object_id, Some(key.version.into())))
1194                // Chain ID filters after Key filters so if there is overlap, we overwrite the key
1195                // filter with the ID filter.
1196                .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1197        ))
1198    }
1199
1200    /// Applies ObjectFilter to the input `RawQuery` and returns a new
1201    /// `RawQuery`.
1202    pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1203        // Start by applying the filters on IDs and/or keys because they are combined as
1204        // a disjunction, while the remaining queries are conjunctions.
1205        if let Some(object_ids) = &self.object_ids {
1206            // Maximally strict - match a vec of 0 elements
1207            if object_ids.is_empty() {
1208                query = or_filter!(query, "1=0");
1209            } else {
1210                let mut inner = String::new();
1211                let mut prefix = "object_id IN (";
1212                for id in object_ids {
1213                    // SAFETY: Writing to a `String` cannot fail.
1214                    write!(
1215                        &mut inner,
1216                        "{prefix}'\\x{}'::bytea",
1217                        hex::encode(id.into_vec())
1218                    )
1219                    .unwrap();
1220                    prefix = ", ";
1221                }
1222                inner.push(')');
1223                query = or_filter!(query, inner);
1224            }
1225        }
1226
1227        if let Some(object_keys) = &self.object_keys {
1228            // Maximally strict - match a vec of 0 elements
1229            if object_keys.is_empty() {
1230                query = or_filter!(query, "1=0");
1231            } else {
1232                let mut inner = String::new();
1233                let mut prefix = "(";
1234                for ObjectKey { object_id, version } in object_keys {
1235                    // SAFETY: Writing to a `String` cannot fail.
1236                    write!(
1237                        &mut inner,
1238                        "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1239                        hex::encode(object_id.into_vec()),
1240                        version
1241                    )
1242                    .unwrap();
1243                    prefix = " OR ";
1244                }
1245                inner.push(')');
1246                query = or_filter!(query, inner);
1247            }
1248        }
1249
1250        if let Some(owner) = self.owner {
1251            query = filter!(
1252                query,
1253                format!(
1254                    "owner_id = '\\x{}'::bytea AND owner_type = {}",
1255                    hex::encode(owner.into_vec()),
1256                    OwnerType::Address as i16
1257                )
1258            );
1259        }
1260
1261        if let Some(type_) = &self.type_ {
1262            return type_.apply_raw(
1263                query,
1264                "object_type",
1265                "object_type_package",
1266                "object_type_module",
1267                "object_type_name",
1268            );
1269        }
1270
1271        query
1272    }
1273
1274    pub(crate) fn has_filters(&self) -> bool {
1275        self != &Default::default()
1276    }
1277}
1278
1279impl HistoricalObjectCursor {
1280    pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1281        Self {
1282            object_id,
1283            checkpoint_viewed_at,
1284        }
1285    }
1286}
1287
1288impl Checkpointed for Cursor {
1289    fn checkpoint_viewed_at(&self) -> u64 {
1290        self.checkpoint_viewed_at
1291    }
1292}
1293
1294impl ScanLimited for Cursor {}
1295
1296impl RawPaginated<Cursor> for StoredHistoryObject {
1297    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1298        filter!(
1299            query,
1300            format!(
1301                "candidates.object_id >= '\\x{}'::bytea",
1302                hex::encode(cursor.object_id.clone())
1303            )
1304        )
1305    }
1306
1307    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1308        filter!(
1309            query,
1310            format!(
1311                "candidates.object_id <= '\\x{}'::bytea",
1312                hex::encode(cursor.object_id.clone())
1313            )
1314        )
1315    }
1316
1317    fn order(asc: bool, query: RawQuery) -> RawQuery {
1318        if asc {
1319            query.order_by("candidates.object_id ASC")
1320        } else {
1321            query.order_by("candidates.object_id DESC")
1322        }
1323    }
1324}
1325
1326impl Target<Cursor> for StoredHistoryObject {
1327    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1328        Cursor::new(HistoricalObjectCursor::new(
1329            self.object_id.clone(),
1330            checkpoint_viewed_at,
1331        ))
1332    }
1333}
1334
1335impl Loader<HistoricalKey> for Db {
1336    type Value = Object;
1337    type Error = Error;
1338
1339    async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1340        use objects_history::dsl as h;
1341        use objects_version::dsl as v;
1342
1343        if keys.is_empty() {
1344            return Ok(HashMap::new());
1345        }
1346
1347        let id_versions: BTreeSet<_> = keys
1348            .iter()
1349            .map(|key| (key.id.into_vec(), key.version as i64))
1350            .collect();
1351
1352        let objects: Vec<StoredHistoryObject> = self
1353            .execute(move |conn| {
1354                conn.results(move || {
1355                    let mut query = h::objects_history
1356                        .inner_join(
1357                            v::objects_version.on(v::cp_sequence_number
1358                                .eq(h::checkpoint_sequence_number)
1359                                .and(v::object_id.eq(h::object_id))
1360                                .and(v::object_version.eq(h::object_version))),
1361                        )
1362                        .select(StoredHistoryObject::as_select())
1363                        .into_boxed();
1364
1365                    for (id, version) in id_versions.iter().cloned() {
1366                        query =
1367                            query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1368                    }
1369
1370                    query
1371                })
1372            })
1373            .await
1374            .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1375
1376        let mut id_version_to_stored = BTreeMap::new();
1377        for stored in objects {
1378            let key = (addr(&stored.object_id)?, stored.object_version as u64);
1379            id_version_to_stored.insert(key, stored);
1380        }
1381
1382        let mut result = HashMap::new();
1383        for key in keys {
1384            let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1385                continue;
1386            };
1387
1388            // Filter by key's checkpoint viewed at here. Doing this in memory because it
1389            // should be quite rare that this query actually filters something,
1390            // but encoding it in SQL is complicated.
1391            if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1392                continue;
1393            }
1394
1395            let object = Object::try_from_stored_history_object(
1396                stored.clone(),
1397                key.checkpoint_viewed_at,
1398                // This conversion will use the object's own version as the `Object::root_version`.
1399                None,
1400            )?;
1401            result.insert(*key, object);
1402        }
1403
1404        Ok(result)
1405    }
1406}
1407
1408impl Loader<OptimisticKey> for Db {
1409    type Value = Object;
1410    type Error = Error;
1411
1412    async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1413        use objects::dsl as o;
1414
1415        if keys.is_empty() {
1416            return Ok(HashMap::new());
1417        }
1418
1419        let id_versions: BTreeSet<_> = keys
1420            .iter()
1421            .map(|key| (key.id.into_vec(), key.version as i64))
1422            .collect();
1423
1424        let objects: Vec<StoredObject> = self
1425            .execute(move |conn| {
1426                conn.results(move || {
1427                    let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1428                    for (id, version) in id_versions.iter().cloned() {
1429                        query =
1430                            query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1431                    }
1432                    query
1433                })
1434            })
1435            .await
1436            .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1437
1438        let mut result = HashMap::new();
1439        let id_version_to_stored = objects
1440            .into_iter()
1441            .map(|stored| {
1442                addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1443            })
1444            .collect::<Result<BTreeMap<_, _>, _>>()?;
1445
1446        // Collect keys that were not found in objects table
1447        let mut missing_keys = Vec::new();
1448        for key in keys {
1449            if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1450                let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1451                result.insert(*key, object);
1452            } else {
1453                missing_keys.push(*key);
1454            }
1455        }
1456
1457        // For missing keys, fallback to using the objects_history table
1458        if !missing_keys.is_empty() {
1459            let historical_keys: Vec<HistoricalKey> = missing_keys
1460                .iter()
1461                .map(|key| HistoricalKey {
1462                    id: key.id,
1463                    version: key.version,
1464                    checkpoint_viewed_at: u64::MAX,
1465                })
1466                .collect();
1467
1468            let historical_result: HashMap<HistoricalKey, Object> =
1469                self.load(&historical_keys).await?;
1470
1471            for (historical_key, object) in historical_result {
1472                let optimistic_key = OptimisticKey {
1473                    id: historical_key.id,
1474                    version: historical_key.version,
1475                };
1476                result.insert(optimistic_key, object);
1477            }
1478        }
1479
1480        Ok(result)
1481    }
1482}
1483
1484impl Loader<ParentVersionKey> for Db {
1485    type Value = Object;
1486    type Error = Error;
1487
1488    async fn load(
1489        &self,
1490        keys: &[ParentVersionKey],
1491    ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1492        // Group keys by checkpoint viewed at and parent version -- we'll issue a
1493        // separate query for each group.
1494        #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1495        struct GroupKey {
1496            checkpoint_viewed_at: u64,
1497            parent_version: u64,
1498        }
1499
1500        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1501        for key in keys {
1502            let group_key = GroupKey {
1503                checkpoint_viewed_at: key.checkpoint_viewed_at,
1504                parent_version: key.parent_version,
1505            };
1506
1507            keys_by_cursor_and_parent_version
1508                .entry(group_key)
1509                .or_default()
1510                .insert(key.id.into_vec());
1511        }
1512
1513        // Issue concurrent reads for each group of keys.
1514        let futures = keys_by_cursor_and_parent_version
1515            .into_iter()
1516            .map(|(group_key, ids)| {
1517                self.execute(move |conn| {
1518                    let stored: Vec<StoredHistoryObject> = conn.results(move || {
1519                        use objects_history::dsl as h;
1520                        use objects_version::dsl as v;
1521
1522                        h::objects_history
1523                            .inner_join(
1524                                v::objects_version.on(v::cp_sequence_number
1525                                    .eq(h::checkpoint_sequence_number)
1526                                    .and(v::object_id.eq(h::object_id))
1527                                    .and(v::object_version.eq(h::object_version))),
1528                            )
1529                            .select(StoredHistoryObject::as_select())
1530                            .filter(v::object_id.eq_any(ids.iter().cloned()))
1531                            .filter(v::object_version.le(group_key.parent_version as i64))
1532                            .distinct_on(v::object_id)
1533                            .order_by(v::object_id)
1534                            .then_order_by(v::object_version.desc())
1535                            .into_boxed()
1536                    })?;
1537
1538                    Ok::<_, diesel::result::Error>(
1539                        stored
1540                            .into_iter()
1541                            .map(|stored| (group_key, stored))
1542                            .collect::<Vec<_>>(),
1543                    )
1544                })
1545            });
1546
1547        // Wait for the reads to all finish, and gather them into the result map.
1548        let groups = futures::future::join_all(futures).await;
1549
1550        let mut results = HashMap::new();
1551        for group in groups {
1552            for (group_key, stored) in
1553                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1554            {
1555                // This particular object is invalid -- it didn't exist at the checkpoint we are
1556                // viewing at.
1557                if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1558                    continue;
1559                }
1560
1561                let object = Object::try_from_stored_history_object(
1562                    stored,
1563                    group_key.checkpoint_viewed_at,
1564                    // If `LatestAtKey::parent_version` is set, it must have been correctly
1565                    // propagated from the `Object::root_version` of some object.
1566                    Some(group_key.parent_version),
1567                )?;
1568
1569                let key = ParentVersionKey {
1570                    id: object.address,
1571                    checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1572                    parent_version: group_key.parent_version,
1573                };
1574
1575                results.insert(key, object);
1576            }
1577        }
1578
1579        Ok(results)
1580    }
1581}
1582
1583impl Loader<LatestAtKey> for Db {
1584    type Value = Object;
1585    type Error = Error;
1586
1587    async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1588        // Group keys by checkpoint viewed at -- we'll issue a separate query for each
1589        // group.
1590        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1591
1592        for key in keys {
1593            keys_by_cursor_and_parent_version
1594                .entry(key.checkpoint_viewed_at)
1595                .or_default()
1596                .insert(key.id);
1597        }
1598
1599        // Issue concurrent reads for each group of keys.
1600        let futures =
1601            keys_by_cursor_and_parent_version
1602                .into_iter()
1603                .map(|(checkpoint_viewed_at, ids)| {
1604                    self.execute_repeatable(move |conn| {
1605                        let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1606                        else {
1607                            return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1608                                vec![],
1609                            );
1610                        };
1611
1612                        let filter = ObjectFilter {
1613                            object_ids: Some(ids.iter().cloned().collect()),
1614                            ..Default::default()
1615                        };
1616
1617                        Ok(conn
1618                            .results(move || {
1619                                build_objects_query(
1620                                    View::Consistent,
1621                                    range,
1622                                    &Page::bounded(ids.len() as u64),
1623                                    |q| filter.apply(q),
1624                                    |q| q,
1625                                )
1626                                .into_boxed()
1627                            })?
1628                            .into_iter()
1629                            .map(|r| (checkpoint_viewed_at, r))
1630                            .collect())
1631                    })
1632                });
1633
1634        // Wait for the reads to all finish, and gather them into the result map.
1635        let groups = futures::future::join_all(futures).await;
1636
1637        let mut results = HashMap::new();
1638        for group in groups {
1639            for (checkpoint_viewed_at, stored) in
1640                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1641            {
1642                let object =
1643                    Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1644
1645                let key = LatestAtKey {
1646                    id: object.address,
1647                    checkpoint_viewed_at,
1648                };
1649
1650                results.insert(key, object);
1651            }
1652        }
1653
1654        Ok(results)
1655    }
1656}
1657
1658impl From<&ObjectKind> for ObjectStatus {
1659    fn from(kind: &ObjectKind) -> Self {
1660        match kind {
1661            ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1662            ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1663            ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1664        }
1665    }
1666}
1667
1668impl From<&Object> for OwnerImpl {
1669    fn from(object: &Object) -> Self {
1670        OwnerImpl {
1671            address: object.address,
1672            checkpoint_viewed_at: object.checkpoint_viewed_at,
1673        }
1674    }
1675}
1676
1677pub(crate) async fn deserialize_move_struct(
1678    move_object: &NativeMoveObject,
1679    resolver: &PackageResolver,
1680) -> Result<(StructTag, MoveStruct), Error> {
1681    let struct_tag = StructTag::from(move_object.type_().clone());
1682    let contents = move_object.contents();
1683    let move_type_layout = resolver
1684        .type_layout(TypeTag::from(struct_tag.clone()))
1685        .await
1686        .map_err(|e| {
1687            Error::Internal(format!(
1688                "Error fetching layout for type {}: {e}",
1689                struct_tag.to_canonical_string(/* with_prefix */ true)
1690            ))
1691        })?;
1692
1693    let MoveTypeLayout::Struct(layout) = move_type_layout else {
1694        return Err(Error::Internal("Object is not a move struct".to_string()));
1695    };
1696
1697    // TODO (annotated-visitor): Use custom visitors for extracting a dynamic field,
1698    // and for creating a GraphQL MoveValue directly (not via an annotated
1699    // visitor).
1700    let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1701        Error::Internal(format!(
1702            "Error deserializing move struct for type {}: {e}",
1703            struct_tag.to_canonical_string(/* with_prefix */ true)
1704        ))
1705    })?;
1706
1707    Ok((struct_tag, move_struct))
1708}
1709
1710/// Constructs a raw query to fetch objects from the database. Objects are
1711/// filtered out if they satisfy the criteria but have a later version in the
1712/// same checkpoint. If object keys are provided, or no filters are specified at
1713/// all, then this final condition is not applied.
1714fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1715where
1716{
1717    if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1718        // If both object IDs and object keys are specified, then we need to query in
1719        // both historical and consistent views, and then union the results.
1720        let ids_only_filter = ObjectFilter {
1721            object_keys: None,
1722            ..filter.clone()
1723        };
1724        let (id_query, id_bindings) = build_objects_query(
1725            View::Consistent,
1726            range,
1727            page,
1728            move |query| ids_only_filter.apply(query),
1729            move |newer| newer,
1730        )
1731        .finish();
1732
1733        let keys_only_filter = ObjectFilter {
1734            object_ids: None,
1735            ..filter.clone()
1736        };
1737        let (key_query, key_bindings) = build_objects_query(
1738            View::Historical,
1739            range,
1740            page,
1741            move |query| keys_only_filter.apply(query),
1742            move |newer| newer,
1743        )
1744        .finish();
1745
1746        RawQuery::new(
1747            format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1748            id_bindings.into_iter().chain(key_bindings).collect(),
1749        )
1750        .order_by("object_id")
1751        .limit(page.limit() as i64)
1752    } else {
1753        // Only one of object IDs or object keys is specified, or neither are specified.
1754        let view = if filter.object_keys.is_some() || !filter.has_filters() {
1755            View::Historical
1756        } else {
1757            View::Consistent
1758        };
1759
1760        build_objects_query(
1761            view,
1762            range,
1763            page,
1764            move |query| filter.apply(query),
1765            move |newer| newer,
1766        )
1767    }
1768}
1769
1770#[cfg(test)]
1771mod tests {
1772    use std::str::FromStr;
1773
1774    use super::*;
1775
1776    #[test]
1777    fn test_owner_filter_intersection() {
1778        let f0 = ObjectFilter {
1779            owner: Some(IotaAddress::from_str("0x1").unwrap()),
1780            ..Default::default()
1781        };
1782
1783        let f1 = ObjectFilter {
1784            owner: Some(IotaAddress::from_str("0x2").unwrap()),
1785            ..Default::default()
1786        };
1787
1788        assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1789        assert_eq!(f0.clone().intersect(f1.clone()), None);
1790    }
1791
1792    #[test]
1793    fn test_key_filter_intersection() {
1794        let i1 = IotaAddress::from_str("0x1").unwrap();
1795        let i2 = IotaAddress::from_str("0x2").unwrap();
1796        let i3 = IotaAddress::from_str("0x3").unwrap();
1797        let i4 = IotaAddress::from_str("0x4").unwrap();
1798
1799        let f0 = ObjectFilter {
1800            object_ids: Some(vec![i1, i3]),
1801            object_keys: Some(vec![
1802                ObjectKey {
1803                    object_id: i2,
1804                    version: 1.into(),
1805                },
1806                ObjectKey {
1807                    object_id: i4,
1808                    version: 2.into(),
1809                },
1810            ]),
1811            ..Default::default()
1812        };
1813
1814        let f1 = ObjectFilter {
1815            object_ids: Some(vec![i1, i2]),
1816            object_keys: Some(vec![ObjectKey {
1817                object_id: i4,
1818                version: 2.into(),
1819            }]),
1820            ..Default::default()
1821        };
1822
1823        let f2 = ObjectFilter {
1824            object_ids: Some(vec![i1, i3]),
1825            ..Default::default()
1826        };
1827
1828        let f3 = ObjectFilter {
1829            object_keys: Some(vec![
1830                ObjectKey {
1831                    object_id: i2,
1832                    version: 2.into(),
1833                },
1834                ObjectKey {
1835                    object_id: i4,
1836                    version: 2.into(),
1837                },
1838            ]),
1839            ..Default::default()
1840        };
1841
1842        assert_eq!(
1843            f0.clone().intersect(f1.clone()),
1844            Some(ObjectFilter {
1845                object_ids: Some(vec![i1]),
1846                object_keys: Some(vec![
1847                    ObjectKey {
1848                        object_id: i2,
1849                        version: 1.into(),
1850                    },
1851                    ObjectKey {
1852                        object_id: i4,
1853                        version: 2.into(),
1854                    },
1855                ]),
1856                ..Default::default()
1857            })
1858        );
1859
1860        assert_eq!(
1861            f1.clone().intersect(f2.clone()),
1862            Some(ObjectFilter {
1863                object_ids: Some(vec![i1]),
1864                ..Default::default()
1865            })
1866        );
1867
1868        assert_eq!(
1869            f1.clone().intersect(f3.clone()),
1870            Some(ObjectFilter {
1871                object_keys: Some(vec![
1872                    ObjectKey {
1873                        object_id: i2,
1874                        version: 2.into(),
1875                    },
1876                    ObjectKey {
1877                        object_id: i4,
1878                        version: 2.into(),
1879                    },
1880                ]),
1881                ..Default::default()
1882            })
1883        );
1884
1885        // i2 got a conflicting version assignment
1886        assert_eq!(f0.clone().intersect(f3.clone()), None);
1887
1888        // No overlap between these two.
1889        assert_eq!(f2.clone().intersect(f3.clone()), None);
1890    }
1891}