iota_graphql_rpc/types/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt::Write,
8};
9
10use async_graphql::{
11    connection::{Connection, CursorType, Edge},
12    dataloader::Loader,
13    *,
14};
15use diesel::{
16    BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper, sql_types,
17};
18use iota_indexer::{
19    models::objects::{StoredHistoryObject, StoredObject},
20    schema::{objects, objects_history, objects_version},
21    types::{ObjectStatus as NativeObjectStatus, OwnerType},
22};
23use iota_types::{
24    base_types::{StructTag, TypeTag},
25    object::{
26        MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
27        bounded_visitor::BoundedVisitor,
28    },
29};
30use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout};
31use serde::{Deserialize, Serialize};
32
33use crate::{
34    backward_view::{HistoricalFilter, consistent, historical},
35    config::DEFAULT_PAGE_SIZE,
36    connection::ScanConnection,
37    consistency::Checkpointed,
38    data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
39    error::Error,
40    filter, or_filter,
41    raw_query::RawQuery,
42    types::{
43        available_range::AvailableRange,
44        balance::{self, Balance},
45        base64::Base64,
46        big_int::BigInt,
47        coin::Coin,
48        coin_metadata::CoinMetadata,
49        cursor::{self, Page, RawPaginated, ScanLimited, Target},
50        digest::Digest,
51        display::{Display, DisplayEntry},
52        dynamic_field::{DynamicField, DynamicFieldName},
53        intersect,
54        iota_address::{IotaAddress, addr},
55        iota_names_registration::{NameFormat, NameRegistration},
56        move_object::MoveObject,
57        move_package::MovePackage,
58        owner::{Owner, OwnerImpl},
59        stake::StakedIota,
60        transaction_block,
61        transaction_block::{TransactionBlock, TransactionBlockFilter},
62        type_filter::{ExactTypeFilter, TypeFilter},
63        uint53::UInt53,
64    },
65};
66
67#[derive(Clone, Debug)]
68pub(crate) struct Object {
69    pub address: IotaAddress,
70    pub kind: ObjectKind,
71    /// The checkpoint sequence number at which this was viewed at.
72    pub checkpoint_viewed_at: u64,
73    /// Root parent object version for dynamic fields.
74    ///
75    /// This enables consistent dynamic field reads in the case of chained
76    /// dynamic object fields, e.g., `Parent -> DOF1 -> DOF2`. In such
77    /// cases, the object versions may end up like `Parent >= DOF1, DOF2`
78    /// but `DOF1 < DOF2`. Thus, database queries for dynamic fields must
79    /// bound the object versions by the version of the root object of the tree.
80    ///
81    /// Essentially, lamport timestamps of objects are updated for all top-level
82    /// mutable objects provided as inputs to a transaction as well as any
83    /// mutated dynamic child objects. However, any dynamic child objects
84    /// that were loaded but not actually mutated don't end up having
85    /// their versions updated.
86    root_version: u64,
87}
88
89/// Type to implement GraphQL fields that are shared by all Objects.
90pub(crate) struct ObjectImpl<'o>(pub &'o Object);
91
92#[derive(Clone, Debug)]
93#[expect(clippy::large_enum_variant)]
94pub(crate) enum ObjectKind {
95    /// An object loaded from serialized data, such as the contents of a
96    /// transaction that hasn't been indexed yet.
97    NotIndexed(NativeObject),
98    /// An object fetched from the index.
99    Indexed(NativeObject, StoredHistoryObject),
100    /// The object is wrapped or deleted and only partial information can be
101    /// loaded from the indexer. The `u64` is the version of the object.
102    WrappedOrDeleted(u64),
103}
104
105#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
106#[graphql(name = "ObjectKind")]
107pub enum ObjectStatus {
108    /// The object is loaded from serialized data, such as the contents of a
109    /// transaction that hasn't been indexed yet.
110    NotIndexed,
111    /// The object is fetched from the index.
112    Indexed,
113    /// The object is deleted or wrapped and only partial information can be
114    /// loaded from the indexer.
115    #[graphql(
116        deprecation = "will be removed with v1.26, as such objects can be considered non-existent"
117    )]
118    WrappedOrDeleted,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
122pub(crate) struct ObjectRef {
123    /// ID of the object.
124    pub address: IotaAddress,
125    /// Version or sequence number of the object.
126    pub version: UInt53,
127    /// Digest of the object.
128    pub digest: Digest,
129}
130
131/// Constrains the set of objects returned. All filters are optional, and the
132/// resulting set of objects are ones whose
133///
134/// - Type matches the `type` filter,
135/// - AND, whose owner matches the `owner` filter,
136/// - AND, whose ID is in `objectIds` OR whose ID and version is in
137///   `objectKeys`.
138#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
139pub(crate) struct ObjectFilter {
140    /// Filter objects by their type's `package`, `package::module`, or their
141    /// fully qualified type name.
142    ///
143    /// Generic types can be queried by either the generic type name, e.g.
144    /// `0x2::coin::Coin`, or by the full type name, such as
145    /// `0x2::coin::Coin<0x2::iota::IOTA>`.
146    pub type_: Option<TypeFilter>,
147
148    /// Filter for live objects by their current owners.
149    pub owner: Option<IotaAddress>,
150
151    /// Filter for live objects by their IDs.
152    pub object_ids: Option<Vec<IotaAddress>>,
153
154    /// Filter for live or potentially historical objects by their ID and
155    /// version.
156    pub object_keys: Option<Vec<ObjectKey>>,
157}
158
159#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
160pub(crate) struct ObjectKey {
161    pub object_id: IotaAddress,
162    pub version: UInt53,
163}
164
165/// The object's owner type: Immutable, Shared, Parent, or Address.
166#[derive(Union, Clone)]
167pub(crate) enum ObjectOwner {
168    Immutable(Immutable),
169    Shared(Shared),
170    Parent(Box<Parent>),
171    Address(AddressOwner),
172}
173
174/// An immutable object is an object that can't be mutated, transferred, or
175/// deleted. Immutable objects have no owner, so anyone can use them.
176#[derive(SimpleObject, Clone)]
177pub(crate) struct Immutable {
178    #[graphql(name = "_")]
179    dummy: Option<bool>,
180}
181
182/// A shared object is an object that is shared using the
183/// 0x2::transfer::share_object function. Unlike owned objects, once an object
184/// is shared, it stays mutable and is accessible by anyone.
185#[derive(SimpleObject, Clone)]
186pub(crate) struct Shared {
187    initial_shared_version: UInt53,
188}
189
190/// If the object's owner is a Parent, this object is part of a dynamic field
191/// (it is the value of the dynamic field, or the intermediate Field object
192/// itself). Also note that if the owner is a parent, then it's guaranteed to be
193/// an object.
194#[derive(SimpleObject, Clone)]
195pub(crate) struct Parent {
196    parent: Option<Object>,
197}
198
199/// An address-owned object is owned by a specific 32-byte address that is
200/// either an account address (derived from a particular signature scheme) or
201/// an object ID. An address-owned object is accessible only to its owner and no
202/// others.
203#[derive(SimpleObject, Clone)]
204pub(crate) struct AddressOwner {
205    owner: Option<Owner>,
206}
207
208/// Filter for a point query of an Object.
209pub(crate) enum ObjectLookup {
210    LatestAt {
211        /// The checkpoint sequence number at which this was viewed at.
212        checkpoint_viewed_at: u64,
213    },
214
215    UnderParent {
216        /// The parent version to be used as an upper bound for the query. Look
217        /// for the latest version of a child object whose version is
218        /// less than or equal to this upper bound.
219        parent_version: u64,
220        /// The checkpoint sequence number at which this was viewed at.
221        checkpoint_viewed_at: u64,
222    },
223
224    VersionAt {
225        /// The exact version of the object to be fetched.
226        version: u64,
227        /// The checkpoint sequence number at which this was viewed at.
228        checkpoint_viewed_at: u64,
229    },
230
231    /// Variant analogous to [`VersionAt`](Self::VersionAt) but for optimistic
232    /// transactions, using the most recent not checkpointed data,
233    /// not bound by any checkpoint sequence number.
234    OptimisticVersion {
235        /// The exact version of the object to be fetched.
236        version: u64,
237    },
238}
239
240pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
241
242/// The inner struct for the `Object`'s cursor. The `object_id` is used as the
243/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
244/// the cursor.
245#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
246pub(crate) struct HistoricalObjectCursor {
247    #[serde(rename = "o")]
248    object_id: Vec<u8>,
249    /// The checkpoint sequence number this was viewed at.
250    #[serde(rename = "c")]
251    checkpoint_viewed_at: u64,
252}
253
254/// Interface implemented by on-chain values that are addressable by an ID (also
255/// referred to as its address). This includes Move objects and packages.
256#[expect(clippy::duplicated_attributes)]
257#[derive(Interface)]
258#[graphql(
259    name = "IObject",
260    field(name = "version", ty = "UInt53"),
261    field(
262        name = "status",
263        ty = "ObjectStatus",
264        desc = r#"
265            The current status of the object as read from the off-chain store. The
266            possible states are:
267            - NOT_INDEXED: The object is loaded from serialized data, such as the
268            contents of a genesis or system package upgrade transaction.
269            - INDEXED: The object is retrieved from the off-chain index and
270            represents the most recent or historical state of the object.
271            - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
272            information can be loaded.
273        "#
274    ),
275    field(
276        name = "digest",
277        ty = "Option<String>",
278        desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
279                string."
280    ),
281    field(
282        name = "owner",
283        ty = "Option<ObjectOwner>",
284        desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
285                Immutable and Shared Objects do not have owners."
286    ),
287    field(
288        name = "previous_transaction_block",
289        ty = "Option<TransactionBlock>",
290        desc = "The transaction block that created this version of the object."
291    ),
292    field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
293    field(
294        name = "received_transaction_blocks",
295        arg(name = "first", ty = "Option<u64>"),
296        arg(name = "after", ty = "Option<transaction_block::Cursor>"),
297        arg(name = "last", ty = "Option<u64>"),
298        arg(name = "before", ty = "Option<transaction_block::Cursor>"),
299        arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
300        arg(name = "scan_limit", ty = "Option<u64>"),
301        ty = "ScanConnection<String, TransactionBlock>",
302        desc = "The transaction blocks that sent objects to this object."
303    ),
304    field(
305        name = "bcs",
306        ty = "Option<Base64>",
307        desc = "The Base64-encoded BCS serialization of the object's content."
308    )
309)]
310pub(crate) enum IObject {
311    Object(Object),
312    MovePackage(MovePackage),
313    MoveObject(MoveObject),
314    Coin(Coin),
315    CoinMetadata(CoinMetadata),
316    StakedIota(StakedIota),
317}
318
319/// `DataLoader` key for fetching an `Object` at a specific version, constrained
320/// by a consistency cursor (if that version was created after the checkpoint
321/// the query is viewing at, then it will fail).
322#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
323struct HistoricalKey {
324    id: IotaAddress,
325    version: u64,
326    checkpoint_viewed_at: u64,
327}
328
329/// `DataLoader` key for fetching objects that haven't been checkpointed yet.
330/// This is used specifically for loading objects
331/// that are part of optimistic transaction effects.
332#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
333struct OptimisticKey {
334    id: IotaAddress,
335    version: u64,
336}
337
338/// `DataLoader` key for fetching the latest version of an object whose parent
339/// object has version `parent_version`, as of `checkpoint_viewed_at`. This
340/// look-up can fail to find a valid object if the key is not self-consistent,
341/// for example if the `parent_version` is set to a higher version
342/// than the object's actual parent as of `checkpoint_viewed_at`.
343#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
344struct ParentVersionKey {
345    id: IotaAddress,
346    parent_version: u64,
347    checkpoint_viewed_at: u64,
348}
349
350/// `DataLoader` key for fetching the latest version of an object as of a given
351/// checkpoint.
352#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
353struct LatestAtKey {
354    id: IotaAddress,
355    checkpoint_viewed_at: u64,
356}
357
358/// An object in IOTA is a package (set of Move bytecode modules) or object
359/// (typed data structure with fields) with additional metadata detailing its
360/// id, version, transaction digest, owner field indicating how this object can
361/// be accessed.
362#[Object]
363impl Object {
364    pub(crate) async fn address(&self) -> IotaAddress {
365        OwnerImpl::from(self).address().await
366    }
367
368    /// Objects owned by this object, optionally `filter`-ed.
369    pub(crate) async fn objects(
370        &self,
371        ctx: &Context<'_>,
372        first: Option<u64>,
373        after: Option<Cursor>,
374        last: Option<u64>,
375        before: Option<Cursor>,
376        filter: Option<ObjectFilter>,
377    ) -> Result<Connection<String, MoveObject>> {
378        OwnerImpl::from(self)
379            .objects(ctx, first, after, last, before, filter)
380            .await
381    }
382
383    /// Total balance of all coins with marker type owned by this object. If
384    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
385    pub(crate) async fn balance(
386        &self,
387        ctx: &Context<'_>,
388        type_: Option<ExactTypeFilter>,
389    ) -> Result<Option<Balance>> {
390        OwnerImpl::from(self).balance(ctx, type_).await
391    }
392
393    /// The balances of all coin types owned by this object.
394    pub(crate) async fn balances(
395        &self,
396        ctx: &Context<'_>,
397        first: Option<u64>,
398        after: Option<balance::Cursor>,
399        last: Option<u64>,
400        before: Option<balance::Cursor>,
401    ) -> Result<Connection<String, Balance>> {
402        OwnerImpl::from(self)
403            .balances(ctx, first, after, last, before)
404            .await
405    }
406
407    /// The coin objects for this object.
408    ///
409    /// `type` is a filter on the coin's type parameter, defaulting to
410    /// `0x2::iota::IOTA`.
411    pub(crate) async fn coins(
412        &self,
413        ctx: &Context<'_>,
414        first: Option<u64>,
415        after: Option<Cursor>,
416        last: Option<u64>,
417        before: Option<Cursor>,
418        type_: Option<ExactTypeFilter>,
419    ) -> Result<Connection<String, Coin>> {
420        OwnerImpl::from(self)
421            .coins(ctx, first, after, last, before, type_)
422            .await
423    }
424
425    /// The `0x3::staking_pool::StakedIota` objects owned by this object.
426    pub(crate) async fn staked_iotas(
427        &self,
428        ctx: &Context<'_>,
429        first: Option<u64>,
430        after: Option<Cursor>,
431        last: Option<u64>,
432        before: Option<Cursor>,
433    ) -> Result<Connection<String, StakedIota>> {
434        OwnerImpl::from(self)
435            .staked_iotas(ctx, first, after, last, before)
436            .await
437    }
438
439    /// The name explicitly configured as the default name pointing to this
440    /// address.
441    pub(crate) async fn iota_names_default_name(
442        &self,
443        ctx: &Context<'_>,
444        format: Option<NameFormat>,
445    ) -> Result<Option<String>> {
446        OwnerImpl::from(self)
447            .iota_names_default_name(ctx, format)
448            .await
449    }
450
451    /// The NameRegistration NFTs owned by this address. These grant the
452    /// owner the capability to manage the associated name.
453    pub(crate) async fn iota_names_registrations(
454        &self,
455        ctx: &Context<'_>,
456        first: Option<u64>,
457        after: Option<Cursor>,
458        last: Option<u64>,
459        before: Option<Cursor>,
460    ) -> Result<Connection<String, NameRegistration>> {
461        OwnerImpl::from(self)
462            .iota_names_registrations(ctx, first, after, last, before)
463            .await
464    }
465
466    pub(crate) async fn version(&self) -> UInt53 {
467        ObjectImpl(self).version().await
468    }
469
470    /// The current status of the object as read from the off-chain store. The
471    /// possible states are:
472    /// - NOT_INDEXED: The object is loaded from serialized data, such as the
473    ///   contents of a genesis or system package upgrade transaction.
474    /// - INDEXED: The object is retrieved from the off-chain index and
475    ///   represents the most recent or historical state of the object.
476    /// - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
477    ///   information can be loaded.
478    pub(crate) async fn status(&self) -> ObjectStatus {
479        ObjectImpl(self).status().await
480    }
481
482    /// 32-byte hash that identifies the object's current contents, encoded as a
483    /// Base58 string.
484    pub(crate) async fn digest(&self) -> Option<String> {
485        ObjectImpl(self).digest().await
486    }
487
488    /// The owner type of this object: Immutable, Shared, Parent, Address
489    /// Immutable and Shared Objects do not have owners.
490    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
491        ObjectImpl(self).owner(ctx).await
492    }
493
494    /// The transaction block that created this version of the object.
495    pub(crate) async fn previous_transaction_block(
496        &self,
497        ctx: &Context<'_>,
498    ) -> Result<Option<TransactionBlock>> {
499        ObjectImpl(self).previous_transaction_block(ctx).await
500    }
501
502    /// The amount of IOTA we would rebate if this object gets deleted or
503    /// mutated. This number is recalculated based on the present storage
504    /// gas price.
505    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
506        ObjectImpl(self).storage_rebate().await
507    }
508
509    /// The transaction blocks that sent objects to this object.
510    ///
511    /// `scanLimit` restricts the number of candidate transactions scanned when
512    /// gathering a page of results. It is required for queries that apply
513    /// more than two complex filters (on function, kind, sender, recipient,
514    /// input object, changed object, or ids), and can be at most
515    /// `serviceConfig.maxScanLimit`.
516    ///
517    /// When the scan limit is reached the page will be returned even if it has
518    /// fewer than `first` results when paginating forward (`last` when
519    /// paginating backwards). If there are more transactions to scan,
520    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
521    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
522    /// to the last transaction that was scanned as opposed to the last (or
523    /// first) transaction in the page.
524    ///
525    /// Requesting the next (or previous) page after this cursor will resume the
526    /// search, scanning the next `scanLimit` many transactions in the
527    /// direction of pagination, and so on until all transactions in the
528    /// scanning range have been visited.
529    ///
530    /// By default, the scanning range includes all transactions known to
531    /// GraphQL, but it can be restricted by the `after` and `before`
532    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
533    /// `atCheckpoint` filters.
534    #[graphql(
535        complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
536    )]
537    pub(crate) async fn received_transaction_blocks(
538        &self,
539        ctx: &Context<'_>,
540        first: Option<u64>,
541        after: Option<transaction_block::Cursor>,
542        last: Option<u64>,
543        before: Option<transaction_block::Cursor>,
544        filter: Option<TransactionBlockFilter>,
545        scan_limit: Option<u64>,
546    ) -> Result<ScanConnection<String, TransactionBlock>> {
547        ObjectImpl(self)
548            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
549            .await
550    }
551
552    /// The Base64-encoded BCS serialization of the object's content.
553    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
554        ObjectImpl(self).bcs().await
555    }
556
557    /// The set of named templates defined on-chain for the type of this object,
558    /// to be handled off-chain. The server substitutes data from the object
559    /// into these templates to generate a display string per template.
560    async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
561        ObjectImpl(self).display(ctx).await
562    }
563
564    /// Access a dynamic field on an object using its name. Names are arbitrary
565    /// Move values whose type have `copy`, `drop`, and `store`, and are
566    /// specified using their type, and their BCS contents, Base64 encoded.
567    ///
568    /// Dynamic fields on wrapped objects can be accessed by using the same API
569    /// under the Owner type.
570    async fn dynamic_field(
571        &self,
572        ctx: &Context<'_>,
573        name: DynamicFieldName,
574    ) -> Result<Option<DynamicField>> {
575        OwnerImpl::from(self)
576            .dynamic_field(ctx, name, Some(self.root_version()))
577            .await
578    }
579
580    /// Access a dynamic object field on an object using its name. Names are
581    /// arbitrary Move values whose type have `copy`, `drop`, and `store`,
582    /// and are specified using their type, and their BCS contents, Base64
583    /// encoded. The value of a dynamic object field can also be accessed
584    /// off-chain directly via its address (e.g. using `Query.object`).
585    ///
586    /// Dynamic fields on wrapped objects can be accessed by using the same API
587    /// under the Owner type.
588    async fn dynamic_object_field(
589        &self,
590        ctx: &Context<'_>,
591        name: DynamicFieldName,
592    ) -> Result<Option<DynamicField>> {
593        OwnerImpl::from(self)
594            .dynamic_object_field(ctx, name, Some(self.root_version()))
595            .await
596    }
597
598    /// The dynamic fields and dynamic object fields on an object.
599    ///
600    /// Dynamic fields on wrapped objects can be accessed by using the same API
601    /// under the Owner type.
602    async fn dynamic_fields(
603        &self,
604        ctx: &Context<'_>,
605        first: Option<u64>,
606        after: Option<Cursor>,
607        last: Option<u64>,
608        before: Option<Cursor>,
609    ) -> Result<Connection<String, DynamicField>> {
610        OwnerImpl::from(self)
611            .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
612            .await
613    }
614
615    /// Attempts to convert the object into a MoveObject
616    async fn as_move_object(&self) -> Option<MoveObject> {
617        MoveObject::try_from(self).ok()
618    }
619
620    /// Attempts to convert the object into a MovePackage
621    async fn as_move_package(&self) -> Option<MovePackage> {
622        MovePackage::try_from(self).ok()
623    }
624}
625
626impl ObjectImpl<'_> {
627    pub(crate) async fn version(&self) -> UInt53 {
628        self.0.version_impl().into()
629    }
630
631    pub(crate) async fn status(&self) -> ObjectStatus {
632        ObjectStatus::from(&self.0.kind)
633    }
634
635    pub(crate) async fn digest(&self) -> Option<String> {
636        self.0
637            .native_impl()
638            .map(|native| native.digest().to_base58())
639    }
640
641    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
642        use NativeOwner as O;
643
644        let native = self.0.native_impl()?;
645
646        match native.owner {
647            O::Address(address) => {
648                let address = IotaAddress::from(address);
649                Some(ObjectOwner::Address(AddressOwner {
650                    owner: Some(Owner {
651                        address,
652                        checkpoint_viewed_at: self.0.checkpoint_viewed_at,
653                        root_version: None,
654                    }),
655                }))
656            }
657            O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
658            O::Object(address) => {
659                let parent = Object::query(
660                    ctx,
661                    address.into(),
662                    Object::latest_at(self.0.checkpoint_viewed_at),
663                )
664                .await
665                .ok()
666                .flatten();
667
668                Some(ObjectOwner::Parent(Box::new(Parent { parent })))
669            }
670            O::Shared(initial_shared_version) => Some(ObjectOwner::Shared(Shared {
671                initial_shared_version: initial_shared_version.as_u64().into(),
672            })),
673            _ => unimplemented!("a new Owner enum variant was added and needs to be handled"),
674        }
675    }
676
677    pub(crate) async fn previous_transaction_block(
678        &self,
679        ctx: &Context<'_>,
680    ) -> Result<Option<TransactionBlock>> {
681        let Some(native) = self.0.native_impl() else {
682            return Ok(None);
683        };
684        let digest = native.previous_transaction;
685        let key = transaction_block::DigestKey::new(digest.into(), self.0.checkpoint_viewed_at);
686
687        TransactionBlock::query(ctx, key.into()).await.extend()
688    }
689
690    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
691        self.0
692            .native_impl()
693            .map(|native| BigInt::from(native.storage_rebate))
694    }
695
696    pub(crate) async fn received_transaction_blocks(
697        &self,
698        ctx: &Context<'_>,
699        first: Option<u64>,
700        after: Option<transaction_block::Cursor>,
701        last: Option<u64>,
702        before: Option<transaction_block::Cursor>,
703        filter: Option<TransactionBlockFilter>,
704        scan_limit: Option<u64>,
705    ) -> Result<ScanConnection<String, TransactionBlock>> {
706        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
707
708        let Some(filter) = filter
709            .unwrap_or_default()
710            .intersect(TransactionBlockFilter {
711                recv_address: Some(self.0.address),
712                ..Default::default()
713            })
714        else {
715            return Ok(ScanConnection::new(false, false));
716        };
717
718        TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
719            .await
720            .extend()
721    }
722
723    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
724        use ObjectKind as K;
725        Ok(match &self.0.kind {
726            K::WrappedOrDeleted(_) => None,
727            // WrappedOrDeleted objects are also read from the historical objects table, and they do
728            // not have a serialized object, so the column is also nullable for stored historical
729            // objects.
730            K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
731
732            K::NotIndexed(native) => {
733                let bytes = bcs::to_bytes(native)
734                    .map_err(|e| {
735                        Error::Internal(format!(
736                            "Failed to serialize object at {}: {e}",
737                            self.0.address
738                        ))
739                    })
740                    .extend()?;
741                Some(Base64::from(&bytes))
742            }
743        })
744    }
745
746    /// `display` is part of the `IMoveObject` interface, but is implemented on
747    /// `ObjectImpl` to allow for a convenience function on `Object`.
748    pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
749        let Some(native) = self.0.native_impl() else {
750            return Ok(None);
751        };
752
753        let move_object = native
754            .data
755            .as_struct_opt()
756            .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
757            .extend()?;
758
759        let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
760            .await
761            .extend()?;
762
763        let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
764            .await
765            .extend()?
766        else {
767            return Ok(None);
768        };
769
770        Ok(Some(display.render(&move_struct).extend()?))
771    }
772}
773
774impl Object {
775    /// Construct a GraphQL object from a native object, without its stored
776    /// (indexed) counterpart.
777    ///
778    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
779    /// which this `Object` was constructed in. This is stored on `Object`
780    /// so that when viewing that entity's state, it will be as if it was
781    /// read at the same checkpoint.
782    ///
783    /// `root_version` represents the version of the root object in some nested
784    /// chain of dynamic fields. This should typically be left `None`,
785    /// unless the object(s) being resolved is a dynamic field, or if
786    /// `root_version` has been explicitly set for this object. If None, then
787    /// we use [`version_for_dynamic_fields`] to infer a root version to then
788    /// propagate from this object down to its dynamic fields.
789    pub(crate) fn from_native(
790        address: IotaAddress,
791        native: NativeObject,
792        checkpoint_viewed_at: u64,
793        root_version: Option<u64>,
794    ) -> Object {
795        let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
796        Object {
797            address,
798            kind: ObjectKind::NotIndexed(native),
799            checkpoint_viewed_at,
800            root_version,
801        }
802    }
803
804    pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
805        use ObjectKind as K;
806
807        match &self.kind {
808            K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
809            K::WrappedOrDeleted(_) => None,
810        }
811    }
812
813    pub(crate) fn version_impl(&self) -> u64 {
814        use ObjectKind as K;
815
816        match &self.kind {
817            K::NotIndexed(native) | K::Indexed(native, _) => native.version().as_u64(),
818            K::WrappedOrDeleted(object_version) => *object_version,
819        }
820    }
821
822    /// Root parent object version for dynamic fields.
823    ///
824    /// Check [`Object::root_version`] for details.
825    pub(crate) fn root_version(&self) -> u64 {
826        self.root_version
827    }
828
829    /// Query the database for a `page` of objects, optionally `filter`-ed.
830    ///
831    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
832    /// which this page was queried for. Each entity returned in the
833    /// connection will inherit this checkpoint, so that when viewing that
834    /// entity's state, it will be as if it was read at the same checkpoint.
835    pub(crate) async fn paginate(
836        db: &Db,
837        page: Page<Cursor>,
838        filter: ObjectFilter,
839        checkpoint_viewed_at: u64,
840    ) -> Result<Connection<String, Object>, Error> {
841        Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
842    }
843
844    /// Query the database for a `page` of some sub-type of Object. The page
845    /// uses the bytes of an Object ID and the checkpoint when the query was
846    /// made as the cursor, and can optionally be further `filter`-ed. The
847    /// subtype is created using the `downcast` function, which is allowed
848    /// to fail, if the downcast has failed.
849    ///
850    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
851    /// which this page was queried for. Each entity returned in the
852    /// connection will inherit this checkpoint, so that when viewing that
853    /// entity's state, it will be as if it was read at the same checkpoint.
854    ///
855    /// If a `Page<Cursor>` is also provided, then this function will defer to
856    /// the `checkpoint_viewed_at` in the cursors. Otherwise, use the value
857    /// from the parameter, or set to None. This is so that paginated
858    /// queries are consistent with the previous query that created the
859    /// cursor.
860    pub(crate) async fn paginate_subtype<T: OutputType>(
861        db: &Db,
862        page: Page<Cursor>,
863        filter: ObjectFilter,
864        checkpoint_viewed_at: u64,
865        downcast: impl Fn(Object) -> Result<T, Error>,
866    ) -> Result<Connection<String, T>, Error> {
867        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
868        // they are consistent. Otherwise, use the value from the parameter, or
869        // set to None. This is so that paginated queries are consistent with
870        // the previous query that created the cursor.
871        let cursor_viewed_at = page.validate_cursor_consistency()?;
872        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
873
874        let max_available_range = db.max_available_range;
875
876        let Some((prev, next, results)) = db
877            .execute_repeatable(move |conn| {
878                if !AvailableRange::is_checkpoint_in_backward_history_range(
879                    conn,
880                    checkpoint_viewed_at,
881                    max_available_range,
882                )? {
883                    return Ok::<_, diesel::result::Error>(None);
884                };
885
886                let (prev, next, results_iter) = page.paginate_raw_query::<StoredBackwardObject>(
887                    conn,
888                    checkpoint_viewed_at,
889                    backward_objects_query(&filter, checkpoint_viewed_at, &page),
890                )?;
891                let results = results_iter.collect();
892                let results = resolve_tombstone_versions(conn, results)?;
893                Ok(Some((prev, next, results)))
894            })
895            .await?
896        else {
897            return Err(Error::Client(
898                "Requested data is outside the available range".to_string(),
899            ));
900        };
901
902        let mut conn: Connection<String, T> = Connection::new(prev, next);
903
904        for stored in results {
905            // To maintain consistency, the returned cursor should have the same upper-bound
906            // as the checkpoint found on the cursor.
907            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
908            let stored_history = stored.into_stored_history(checkpoint_viewed_at);
909            let object =
910                Object::try_from_stored_history_object(stored_history, checkpoint_viewed_at, None)?;
911            conn.edges.push(Edge::new(cursor, downcast(object)?));
912        }
913
914        Ok(conn)
915    }
916
917    /// Look-up the latest version of the object as of a given checkpoint.
918    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
919        ObjectLookup::LatestAt {
920            checkpoint_viewed_at,
921        }
922    }
923
924    /// Look-up the latest version of an object whose version is less than or
925    /// equal to its parent's version, as of a given checkpoint.
926    pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
927        ObjectLookup::UnderParent {
928            parent_version,
929            checkpoint_viewed_at,
930        }
931    }
932
933    /// Look-up a specific version of the object, as of a given checkpoint.
934    pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
935        ObjectLookup::VersionAt {
936            version,
937            checkpoint_viewed_at,
938        }
939    }
940
941    /// Look-up a specific version of the object from optimistic transactions.
942    pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
943        ObjectLookup::OptimisticVersion { version }
944    }
945
946    pub(crate) async fn query(
947        ctx: &Context<'_>,
948        id: IotaAddress,
949        key: ObjectLookup,
950    ) -> Result<Option<Self>, Error> {
951        let DataLoader(loader) = &ctx.data_unchecked();
952
953        match key {
954            ObjectLookup::VersionAt {
955                version,
956                checkpoint_viewed_at,
957            } => {
958                loader
959                    .load_one(HistoricalKey {
960                        id,
961                        version,
962                        checkpoint_viewed_at,
963                    })
964                    .await
965            }
966
967            ObjectLookup::OptimisticVersion { version } => {
968                loader.load_one(OptimisticKey { id, version }).await
969            }
970
971            ObjectLookup::UnderParent {
972                parent_version,
973                checkpoint_viewed_at,
974            } => {
975                loader
976                    .load_one(ParentVersionKey {
977                        id,
978                        parent_version,
979                        checkpoint_viewed_at,
980                    })
981                    .await
982            }
983
984            ObjectLookup::LatestAt {
985                checkpoint_viewed_at,
986            } => {
987                loader
988                    .load_one(LatestAtKey {
989                        id,
990                        checkpoint_viewed_at,
991                    })
992                    .await
993            }
994        }
995    }
996
997    /// Query for a singleton object identified by its type. Note: the object is
998    /// assumed to be a singleton (we either find at least one object with
999    /// this type and then return it, or return nothing).
1000    pub(crate) async fn query_singleton(
1001        db: &Db,
1002        type_: StructTag,
1003        checkpoint_viewed_at: u64,
1004    ) -> Result<Option<Object>, Error> {
1005        let filter = ObjectFilter {
1006            type_: Some(TypeFilter::ByType(type_)),
1007            ..Default::default()
1008        };
1009
1010        let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
1011
1012        Ok(connection.edges.into_iter().next().map(|edge| edge.node))
1013    }
1014
1015    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
1016    /// which this `Object` was constructed in. This is stored on `Object`
1017    /// so that when viewing that entity's state, it will be as if it was
1018    /// read at the same checkpoint.
1019    ///
1020    /// `root_version` represents the version of the root object in some nested
1021    /// chain of dynamic fields. This should typically be left `None`,
1022    /// unless the object(s) being resolved is a dynamic field, or if
1023    /// `root_version` has been explicitly set for this object. If None, then
1024    /// we use [`version_for_dynamic_fields`] to infer a root version to then
1025    /// propagate from this object down to its dynamic fields.
1026    pub(crate) fn try_from_stored_history_object(
1027        history_object: StoredHistoryObject,
1028        checkpoint_viewed_at: u64,
1029        root_version: Option<u64>,
1030    ) -> Result<Self, Error> {
1031        let address = addr(&history_object.object_id)?;
1032
1033        let object_status =
1034            NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1035                Error::Internal(format!(
1036                    "Unknown object status {} for object {} at version {}",
1037                    history_object.object_status, address, history_object.object_version
1038                ))
1039            })?;
1040
1041        match object_status {
1042            NativeObjectStatus::Active => {
1043                let Some(serialized_object) = &history_object.serialized_object else {
1044                    return Err(Error::Internal(format!(
1045                        "Live object {} at version {} cannot have missing serialized_object field",
1046                        address, history_object.object_version
1047                    )));
1048                };
1049
1050                let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1051                    Error::Internal(format!("Failed to deserialize object {address}"))
1052                })?;
1053
1054                let root_version =
1055                    root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1056                Ok(Self {
1057                    address,
1058                    kind: ObjectKind::Indexed(native_object, history_object),
1059                    checkpoint_viewed_at,
1060                    root_version,
1061                })
1062            }
1063            NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1064                address,
1065                kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1066                checkpoint_viewed_at,
1067                root_version: history_object.object_version as u64,
1068            }),
1069        }
1070    }
1071
1072    pub(crate) fn try_from_stored_object(
1073        stored_object: StoredObject,
1074        checkpoint_viewed_at: u64,
1075    ) -> Result<Self, Error> {
1076        let address = addr(&stored_object.object_id)?;
1077
1078        let native_object = bcs::from_bytes(&stored_object.serialized_object)
1079            .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1080
1081        let root_version = version_for_dynamic_fields(&native_object);
1082
1083        let stored_history_like = StoredHistoryObject {
1084            object_id: stored_object.object_id,
1085            object_version: stored_object.object_version,
1086            object_digest: Some(stored_object.object_digest),
1087            object_status: NativeObjectStatus::Active as i16,
1088            checkpoint_sequence_number: checkpoint_viewed_at as i64,
1089            serialized_object: Some(stored_object.serialized_object),
1090            object_type: stored_object.object_type,
1091            object_type_package: stored_object.object_type_package,
1092            object_type_module: stored_object.object_type_module,
1093            object_type_name: stored_object.object_type_name,
1094            owner_type: Some(stored_object.owner_type),
1095            owner_id: stored_object.owner_id,
1096            coin_type: stored_object.coin_type,
1097            coin_balance: stored_object.coin_balance,
1098            df_kind: stored_object.df_kind,
1099        };
1100
1101        Ok(Self {
1102            address,
1103            kind: ObjectKind::Indexed(native_object, stored_history_like),
1104            checkpoint_viewed_at,
1105            root_version,
1106        })
1107    }
1108}
1109
1110/// We're deliberately choosing to use a child object's version as the root
1111/// here, and letting the caller override it with the actual root object's
1112/// version if it has access to it.
1113///
1114/// Using the child object's version as the root means that we're seeing the
1115/// dynamic field tree under this object at the state resulting from the
1116/// transaction that produced this version.
1117///
1118/// See [`Object::root_version`] for more details on parent/child object version
1119/// mechanics.
1120fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1121    native.as_inner().version().as_u64()
1122}
1123
1124impl ObjectFilter {
1125    /// Try to create a filter whose results are the intersection of objects in
1126    /// `self`'s results and objects in `other`'s results. This may not be
1127    /// possible if the resulting filter is inconsistent in some way (e.g. a
1128    /// filter that requires one field to be two different values
1129    /// simultaneously).
1130    pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1131        macro_rules! intersect {
1132            ($field:ident, $body:expr) => {
1133                intersect::field(self.$field, other.$field, $body)
1134            };
1135        }
1136
1137        // Treat `object_ids` and `object_keys` as a single filter on IDs, and
1138        // optionally versions, and compute the intersection of that.
1139        let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1140            let mut combined = BTreeMap::new();
1141
1142            for (id, v) in k {
1143                if let Some(w) = l.get(&id).copied() {
1144                    combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1145                }
1146            }
1147
1148            // If the intersection is empty, it means, there were some ID or Key filters in
1149            // both `self` and `other`, but they don't overlap, so the final
1150            // result is inconsistent.
1151            (!combined.is_empty()).then_some(combined)
1152        })?;
1153
1154        // Extract the ID and Key filters back out. At this point, we know that if there
1155        // were ID/Key filters in both `self` and `other`, then they intersected
1156        // to form a consistent set of constraints, so it is safe to interpret
1157        // the lack of any ID/Key filters respectively as a lack of that kind of
1158        // constraint, rather than a constraint on the empty set.
1159
1160        let object_ids = {
1161            let partition: Vec<_> = keys
1162                .iter()
1163                .flatten()
1164                .filter_map(|(id, v)| v.is_none().then_some(*id))
1165                .collect();
1166
1167            (!partition.is_empty()).then_some(partition)
1168        };
1169
1170        let object_keys = {
1171            let partition: Vec<_> = keys
1172                .iter()
1173                .flatten()
1174                .filter_map(|(id, v)| {
1175                    Some(ObjectKey {
1176                        object_id: *id,
1177                        version: (*v)?.into(),
1178                    })
1179                })
1180                .collect();
1181
1182            (!partition.is_empty()).then_some(partition)
1183        };
1184
1185        Some(Self {
1186            type_: intersect!(type_, TypeFilter::intersect)?,
1187            owner: intersect!(owner, intersect::by_eq)?,
1188            object_ids,
1189            object_keys,
1190        })
1191    }
1192
1193    /// Extract the Object ID and Key filters into one combined map from Object
1194    /// IDs in this filter, to the versions they should have (or None if the
1195    /// filter mentions the ID but no version for it).
1196    fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1197        if self.object_keys.is_none() && self.object_ids.is_none() {
1198            return None;
1199        }
1200
1201        Some(BTreeMap::from_iter(
1202            self.object_keys
1203                .iter()
1204                .flatten()
1205                .map(|key| (key.object_id, Some(key.version.into())))
1206                // Chain ID filters after Key filters so if there is overlap, we overwrite the key
1207                // filter with the ID filter.
1208                .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1209        ))
1210    }
1211
1212    /// Applies ObjectFilter to the input `RawQuery` and returns a new
1213    /// `RawQuery`.
1214    pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1215        // Start by applying the filters on IDs and/or keys because they are combined as
1216        // a disjunction, while the remaining queries are conjunctions.
1217        if let Some(object_ids) = &self.object_ids {
1218            // Maximally strict - match a vec of 0 elements
1219            if object_ids.is_empty() {
1220                query = or_filter!(query, "1=0");
1221            } else {
1222                let mut inner = String::new();
1223                let mut prefix = "object_id IN (";
1224                for id in object_ids {
1225                    // SAFETY: Writing to a `String` cannot fail.
1226                    write!(
1227                        &mut inner,
1228                        "{prefix}'\\x{}'::bytea",
1229                        hex::encode(id.into_vec())
1230                    )
1231                    .unwrap();
1232                    prefix = ", ";
1233                }
1234                inner.push(')');
1235                query = or_filter!(query, inner);
1236            }
1237        }
1238
1239        if let Some(object_keys) = &self.object_keys {
1240            // Maximally strict - match a vec of 0 elements
1241            if object_keys.is_empty() {
1242                query = or_filter!(query, "1=0");
1243            } else {
1244                let mut inner = String::new();
1245                let mut prefix = "(";
1246                for ObjectKey { object_id, version } in object_keys {
1247                    // SAFETY: Writing to a `String` cannot fail.
1248                    write!(
1249                        &mut inner,
1250                        "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1251                        hex::encode(object_id.into_vec()),
1252                        version
1253                    )
1254                    .unwrap();
1255                    prefix = " OR ";
1256                }
1257                inner.push(')');
1258                query = or_filter!(query, inner);
1259            }
1260        }
1261
1262        if let Some(owner) = self.owner {
1263            query = filter!(
1264                query,
1265                format!(
1266                    "owner_id = '\\x{}'::bytea AND owner_type = {}",
1267                    hex::encode(owner.into_vec()),
1268                    OwnerType::Address as i16
1269                )
1270            );
1271        }
1272
1273        if let Some(type_) = &self.type_ {
1274            return type_.apply_raw(
1275                query,
1276                "object_type",
1277                "object_type_package",
1278                "object_type_module",
1279                "object_type_name",
1280            );
1281        }
1282
1283        query
1284    }
1285}
1286
1287impl HistoricalObjectCursor {
1288    pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1289        Self {
1290            object_id,
1291            checkpoint_viewed_at,
1292        }
1293    }
1294}
1295
1296impl Checkpointed for Cursor {
1297    fn checkpoint_viewed_at(&self) -> u64 {
1298        self.checkpoint_viewed_at
1299    }
1300}
1301
1302impl ScanLimited for Cursor {}
1303
1304impl RawPaginated<Cursor> for StoredHistoryObject {
1305    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1306        filter!(
1307            query,
1308            format!(
1309                "candidates.object_id >= '\\x{}'::bytea",
1310                hex::encode(cursor.object_id.clone())
1311            )
1312        )
1313    }
1314
1315    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1316        filter!(
1317            query,
1318            format!(
1319                "candidates.object_id <= '\\x{}'::bytea",
1320                hex::encode(cursor.object_id.clone())
1321            )
1322        )
1323    }
1324
1325    fn order(asc: bool, query: RawQuery) -> RawQuery {
1326        if asc {
1327            query.order_by("candidates.object_id ASC")
1328        } else {
1329            query.order_by("candidates.object_id DESC")
1330        }
1331    }
1332}
1333
1334impl Target<Cursor> for StoredHistoryObject {
1335    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1336        Cursor::new(HistoricalObjectCursor::new(
1337            self.object_id.clone(),
1338            checkpoint_viewed_at,
1339        ))
1340    }
1341}
1342
1343/// Query-result struct for the backward diff read path. Used by
1344/// `build_backward_objects_query` which unions `checkpointed_objects` and
1345/// `objects_backward_history`. Has explicit `sql_type` annotations because
1346/// there is no single backing Diesel table definition.
1347#[derive(diesel::QueryableByName, Clone, Debug)]
1348pub(crate) struct StoredBackwardObject {
1349    #[diesel(sql_type = sql_types::Binary)]
1350    pub object_id: Vec<u8>,
1351    #[diesel(sql_type = sql_types::BigInt)]
1352    pub object_version: i64,
1353    #[diesel(sql_type = sql_types::SmallInt)]
1354    pub object_status: i16,
1355    #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1356    pub object_digest: Option<Vec<u8>>,
1357    #[diesel(sql_type = sql_types::Nullable<sql_types::SmallInt>)]
1358    pub owner_type: Option<i16>,
1359    #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1360    pub owner_id: Option<Vec<u8>>,
1361    #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1362    pub object_type: Option<String>,
1363    #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1364    pub object_type_package: Option<Vec<u8>>,
1365    #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1366    pub object_type_module: Option<String>,
1367    #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1368    pub object_type_name: Option<String>,
1369    #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1370    pub serialized_object: Option<Vec<u8>>,
1371    #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1372    pub coin_type: Option<String>,
1373    #[diesel(sql_type = sql_types::Nullable<sql_types::BigInt>)]
1374    pub coin_balance: Option<i64>,
1375    #[diesel(sql_type = sql_types::Nullable<sql_types::SmallInt>)]
1376    pub df_kind: Option<i16>,
1377    /// `TRUE` when the row came from `objects_backward_history`, `FALSE`
1378    /// otherwise (from `checkpointed_objects` or `objects_version`). Used to
1379    /// decide whether version correction is needed for `WrappedOrDeleted`
1380    /// entries, since backward history stores a lamport-1 approximation.
1381    #[diesel(sql_type = sql_types::Bool)]
1382    pub from_backward_history: bool,
1383}
1384
1385impl StoredBackwardObject {
1386    /// Convert into a `StoredHistoryObject`, filling in
1387    /// `checkpoint_sequence_number` with the checkpoint the object was
1388    /// viewed at. The backward diff query guarantees the result is valid at
1389    /// this checkpoint, so it's the most accurate value we have.
1390    pub(crate) fn into_stored_history(self, checkpoint_viewed_at: u64) -> StoredHistoryObject {
1391        StoredHistoryObject {
1392            object_id: self.object_id,
1393            object_version: self.object_version,
1394            object_status: self.object_status,
1395            object_digest: self.object_digest,
1396            checkpoint_sequence_number: checkpoint_viewed_at as i64,
1397            owner_type: self.owner_type,
1398            owner_id: self.owner_id,
1399            object_type: self.object_type,
1400            object_type_package: self.object_type_package,
1401            object_type_module: self.object_type_module,
1402            object_type_name: self.object_type_name,
1403            serialized_object: self.serialized_object,
1404            coin_type: self.coin_type,
1405            coin_balance: self.coin_balance,
1406            df_kind: self.df_kind,
1407        }
1408    }
1409}
1410
1411/// Resolves real tombstone versions for `WrappedOrDeleted` entries from
1412/// `objects_backward_history`.
1413///
1414/// The backward history stores a lamport-1 version approximation which may be
1415/// higher than the actual tombstone version. This function looks up the real
1416/// version from `objects_version` using a single batch query that unnests
1417/// bound `bytea[]` / `bigint[]` parameter arrays into `(object_id, version)`
1418/// pairs and joins them via `MAX(object_version) <= backward_history_version`.
1419/// Only entries tagged with `from_backward_history = true` are resolved;
1420/// entries from `checkpointed_objects` already have the correct version.
1421pub(crate) fn resolve_tombstone_versions(
1422    conn: &mut crate::data::pg::PgConnection<'_>,
1423    results: Vec<StoredBackwardObject>,
1424) -> Result<Vec<StoredBackwardObject>, diesel::result::Error> {
1425    let (ids, versions): (Vec<Vec<u8>>, Vec<i64>) = results
1426        .iter()
1427        .filter(|r| {
1428            r.from_backward_history
1429                && r.object_status == NativeObjectStatus::WrappedOrDeleted as i16
1430        })
1431        .map(|r| (r.object_id.clone(), r.object_version))
1432        .unzip();
1433
1434    if ids.is_empty() {
1435        return Ok(results);
1436    }
1437
1438    // Bound `unnest` arrays (rather than an inlined `VALUES` list) keep the
1439    // SQL text constant across calls so Postgres can reuse a cached plan,
1440    // and skip the parser cost of every `::bytea` / `::bigint` cast.
1441    let sql = "SELECT pairs.object_id, pairs.backward_history_version, \
1442                      MAX(ov.object_version) AS real_version \
1443               FROM unnest($1::bytea[], $2::bigint[]) \
1444                    AS pairs(object_id, backward_history_version) \
1445               LEFT JOIN objects_version ov \
1446                 ON ov.object_id = pairs.object_id \
1447                AND ov.object_version <= pairs.backward_history_version \
1448               GROUP BY pairs.object_id, pairs.backward_history_version";
1449
1450    #[derive(diesel::QueryableByName)]
1451    struct ResolvedVersion {
1452        #[diesel(sql_type = sql_types::Binary)]
1453        object_id: Vec<u8>,
1454        #[diesel(sql_type = sql_types::BigInt)]
1455        backward_history_version: i64,
1456        #[diesel(sql_type = sql_types::Nullable<sql_types::BigInt>)]
1457        real_version: Option<i64>,
1458    }
1459
1460    let rows: Vec<ResolvedVersion> = conn.results(|| {
1461        diesel::sql_query(sql)
1462            .bind::<sql_types::Array<sql_types::Binary>, _>(ids.clone())
1463            .bind::<sql_types::Array<sql_types::BigInt>, _>(versions.clone())
1464    })?;
1465
1466    // Key by (object_id, backward_history_version) → real_version
1467    let resolved_map: HashMap<Vec<u8>, HashMap<i64, i64>> = rows
1468        .into_iter()
1469        .filter_map(|r| {
1470            r.real_version
1471                .map(|real| (r.object_id, r.backward_history_version, real))
1472        })
1473        .fold(HashMap::new(), |mut acc, (id, ver, real)| {
1474            acc.entry(id).or_default().insert(ver, real);
1475            acc
1476        });
1477
1478    Ok(results
1479        .into_iter()
1480        .map(|mut r| {
1481            if r.from_backward_history
1482                && r.object_status == NativeObjectStatus::WrappedOrDeleted as i16
1483            {
1484                if let Some(&real_version) = resolved_map
1485                    .get(&r.object_id)
1486                    .and_then(|versions| versions.get(&r.object_version))
1487                {
1488                    r.object_version = real_version;
1489                }
1490            }
1491            r
1492        })
1493        .collect())
1494}
1495
1496impl RawPaginated<Cursor> for StoredBackwardObject {
1497    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1498        filter!(
1499            query,
1500            format!(
1501                "candidates.object_id >= '\\x{}'::bytea",
1502                hex::encode(cursor.object_id.clone())
1503            )
1504        )
1505    }
1506
1507    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1508        filter!(
1509            query,
1510            format!(
1511                "candidates.object_id <= '\\x{}'::bytea",
1512                hex::encode(cursor.object_id.clone())
1513            )
1514        )
1515    }
1516
1517    fn order(asc: bool, query: RawQuery) -> RawQuery {
1518        if asc {
1519            query.order_by("candidates.object_id ASC")
1520        } else {
1521            query.order_by("candidates.object_id DESC")
1522        }
1523    }
1524}
1525
1526impl Target<Cursor> for StoredBackwardObject {
1527    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1528        Cursor::new(HistoricalObjectCursor::new(
1529            self.object_id.clone(),
1530            checkpoint_viewed_at,
1531        ))
1532    }
1533}
1534
1535impl Loader<HistoricalKey> for Db {
1536    type Value = Object;
1537    type Error = Error;
1538
1539    async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1540        use objects_history::dsl as h;
1541        use objects_version::dsl as v;
1542
1543        if keys.is_empty() {
1544            return Ok(HashMap::new());
1545        }
1546
1547        let id_versions: BTreeSet<_> = keys
1548            .iter()
1549            .map(|key| (key.id.into_vec(), key.version as i64))
1550            .collect();
1551
1552        let objects: Vec<StoredHistoryObject> = self
1553            .execute(move |conn| {
1554                conn.results(move || {
1555                    let mut query = h::objects_history
1556                        .inner_join(
1557                            v::objects_version.on(v::cp_sequence_number
1558                                .eq(h::checkpoint_sequence_number)
1559                                .and(v::object_id.eq(h::object_id))
1560                                .and(v::object_version.eq(h::object_version))),
1561                        )
1562                        .select(StoredHistoryObject::as_select())
1563                        .into_boxed();
1564
1565                    for (id, version) in id_versions.iter().cloned() {
1566                        query =
1567                            query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1568                    }
1569
1570                    query
1571                })
1572            })
1573            .await
1574            .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1575
1576        let mut id_version_to_stored = BTreeMap::new();
1577        for stored in objects {
1578            let key = (addr(&stored.object_id)?, stored.object_version as u64);
1579            id_version_to_stored.insert(key, stored);
1580        }
1581
1582        let mut result = HashMap::new();
1583        for key in keys {
1584            let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1585                continue;
1586            };
1587
1588            // Filter by key's checkpoint viewed at here. Doing this in memory because it
1589            // should be quite rare that this query actually filters something,
1590            // but encoding it in SQL is complicated.
1591            if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1592                continue;
1593            }
1594
1595            let object = Object::try_from_stored_history_object(
1596                stored.clone(),
1597                key.checkpoint_viewed_at,
1598                // This conversion will use the object's own version as the `Object::root_version`.
1599                None,
1600            )?;
1601            result.insert(*key, object);
1602        }
1603
1604        Ok(result)
1605    }
1606}
1607
1608impl Loader<OptimisticKey> for Db {
1609    type Value = Object;
1610    type Error = Error;
1611
1612    async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1613        use objects::dsl as o;
1614
1615        if keys.is_empty() {
1616            return Ok(HashMap::new());
1617        }
1618
1619        let id_versions: BTreeSet<_> = keys
1620            .iter()
1621            .map(|key| (key.id.into_vec(), key.version as i64))
1622            .collect();
1623
1624        let objects: Vec<StoredObject> = self
1625            .execute(move |conn| {
1626                conn.results(move || {
1627                    let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1628                    for (id, version) in id_versions.iter().cloned() {
1629                        query =
1630                            query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1631                    }
1632                    query
1633                })
1634            })
1635            .await
1636            .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1637
1638        let mut result = HashMap::new();
1639        let id_version_to_stored = objects
1640            .into_iter()
1641            .map(|stored| {
1642                addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1643            })
1644            .collect::<Result<BTreeMap<_, _>, _>>()?;
1645
1646        // Collect keys that were not found in objects table
1647        let mut missing_keys = Vec::new();
1648        for key in keys {
1649            if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1650                let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1651                result.insert(*key, object);
1652            } else {
1653                missing_keys.push(*key);
1654            }
1655        }
1656
1657        // For missing keys, fallback to using the objects_history table
1658        if !missing_keys.is_empty() {
1659            let historical_keys: Vec<HistoricalKey> = missing_keys
1660                .iter()
1661                .map(|key| HistoricalKey {
1662                    id: key.id,
1663                    version: key.version,
1664                    checkpoint_viewed_at: u64::MAX,
1665                })
1666                .collect();
1667
1668            let historical_result: HashMap<HistoricalKey, Object> =
1669                self.load(&historical_keys).await?;
1670
1671            for (historical_key, object) in historical_result {
1672                let optimistic_key = OptimisticKey {
1673                    id: historical_key.id,
1674                    version: historical_key.version,
1675                };
1676                result.insert(optimistic_key, object);
1677            }
1678        }
1679
1680        Ok(result)
1681    }
1682}
1683
1684impl Loader<ParentVersionKey> for Db {
1685    type Value = Object;
1686    type Error = Error;
1687
1688    async fn load(
1689        &self,
1690        keys: &[ParentVersionKey],
1691    ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1692        // Group keys by checkpoint viewed at and parent version -- we'll issue a
1693        // separate query for each group.
1694        #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1695        struct GroupKey {
1696            checkpoint_viewed_at: u64,
1697            parent_version: u64,
1698        }
1699
1700        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1701        for key in keys {
1702            let group_key = GroupKey {
1703                checkpoint_viewed_at: key.checkpoint_viewed_at,
1704                parent_version: key.parent_version,
1705            };
1706
1707            keys_by_cursor_and_parent_version
1708                .entry(group_key)
1709                .or_default()
1710                .insert(key.id.into_vec());
1711        }
1712
1713        // Issue concurrent reads for each group of keys.
1714        let futures = keys_by_cursor_and_parent_version
1715            .into_iter()
1716            .map(|(group_key, ids)| {
1717                self.execute(move |conn| {
1718                    let stored: Vec<StoredHistoryObject> = conn.results(move || {
1719                        use objects_history::dsl as h;
1720                        use objects_version::dsl as v;
1721
1722                        h::objects_history
1723                            .inner_join(
1724                                v::objects_version.on(v::cp_sequence_number
1725                                    .eq(h::checkpoint_sequence_number)
1726                                    .and(v::object_id.eq(h::object_id))
1727                                    .and(v::object_version.eq(h::object_version))),
1728                            )
1729                            .select(StoredHistoryObject::as_select())
1730                            .filter(v::object_id.eq_any(ids.iter().cloned()))
1731                            .filter(v::object_version.le(group_key.parent_version as i64))
1732                            .distinct_on(v::object_id)
1733                            .order_by(v::object_id)
1734                            .then_order_by(v::object_version.desc())
1735                            .into_boxed()
1736                    })?;
1737
1738                    Ok::<_, diesel::result::Error>(
1739                        stored
1740                            .into_iter()
1741                            .map(|stored| (group_key, stored))
1742                            .collect::<Vec<_>>(),
1743                    )
1744                })
1745            });
1746
1747        // Wait for the reads to all finish, and gather them into the result map.
1748        let groups = futures::future::join_all(futures).await;
1749
1750        let mut results = HashMap::new();
1751        for group in groups {
1752            for (group_key, stored) in
1753                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1754            {
1755                // This particular object is invalid -- it didn't exist at the checkpoint we are
1756                // viewing at.
1757                if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1758                    continue;
1759                }
1760
1761                let object = Object::try_from_stored_history_object(
1762                    stored,
1763                    group_key.checkpoint_viewed_at,
1764                    // If `LatestAtKey::parent_version` is set, it must have been correctly
1765                    // propagated from the `Object::root_version` of some object.
1766                    Some(group_key.parent_version),
1767                )?;
1768
1769                let key = ParentVersionKey {
1770                    id: object.address,
1771                    checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1772                    parent_version: group_key.parent_version,
1773                };
1774
1775                results.insert(key, object);
1776            }
1777        }
1778
1779        Ok(results)
1780    }
1781}
1782
1783impl Loader<LatestAtKey> for Db {
1784    type Value = Object;
1785    type Error = Error;
1786
1787    async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1788        // Group keys by checkpoint viewed at -- we'll issue a separate query for each
1789        // group.
1790        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1791
1792        for key in keys {
1793            keys_by_cursor_and_parent_version
1794                .entry(key.checkpoint_viewed_at)
1795                .or_default()
1796                .insert(key.id);
1797        }
1798
1799        let max_available_range = self.max_available_range;
1800
1801        // Issue concurrent reads for each group of keys.
1802        let futures =
1803            keys_by_cursor_and_parent_version
1804                .into_iter()
1805                .map(|(checkpoint_viewed_at, ids)| {
1806                    self.execute_repeatable(move |conn| {
1807                        if !AvailableRange::is_checkpoint_in_backward_history_range(
1808                            conn,
1809                            checkpoint_viewed_at,
1810                            max_available_range,
1811                        )? {
1812                            return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1813                                vec![],
1814                            );
1815                        };
1816
1817                        let filter = ObjectFilter {
1818                            object_ids: Some(ids.iter().cloned().collect()),
1819                            ..Default::default()
1820                        };
1821
1822                        let results: Vec<StoredBackwardObject> = conn.results(move || {
1823                            consistent::query(
1824                                checkpoint_viewed_at,
1825                                &Page::bounded(ids.len() as u64),
1826                                |q| filter.apply(q),
1827                            )
1828                            .into_boxed()
1829                        })?;
1830                        let results = resolve_tombstone_versions(conn, results)?;
1831
1832                        Ok(results
1833                            .into_iter()
1834                            .map(|r| {
1835                                (
1836                                    checkpoint_viewed_at,
1837                                    r.into_stored_history(checkpoint_viewed_at),
1838                                )
1839                            })
1840                            .collect())
1841                    })
1842                });
1843
1844        // Wait for the reads to all finish, and gather them into the result map.
1845        let groups = futures::future::join_all(futures).await;
1846
1847        let mut results = HashMap::new();
1848        for group in groups {
1849            for (checkpoint_viewed_at, stored) in
1850                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1851            {
1852                let object =
1853                    Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1854
1855                let key = LatestAtKey {
1856                    id: object.address,
1857                    checkpoint_viewed_at,
1858                };
1859
1860                results.insert(key, object);
1861            }
1862        }
1863
1864        Ok(results)
1865    }
1866}
1867
1868impl From<&ObjectKind> for ObjectStatus {
1869    fn from(kind: &ObjectKind) -> Self {
1870        match kind {
1871            ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1872            ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1873            ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1874        }
1875    }
1876}
1877
1878impl From<&Object> for OwnerImpl {
1879    fn from(object: &Object) -> Self {
1880        OwnerImpl {
1881            address: object.address,
1882            checkpoint_viewed_at: object.checkpoint_viewed_at,
1883        }
1884    }
1885}
1886
1887pub(crate) async fn deserialize_move_struct(
1888    move_object: &NativeMoveObject,
1889    resolver: &PackageResolver,
1890) -> Result<(StructTag, MoveStruct), Error> {
1891    let struct_tag = move_object.struct_tag().clone();
1892    let contents = move_object.contents();
1893    let move_type_layout = resolver
1894        .type_layout(TypeTag::from(struct_tag.clone()))
1895        .await
1896        .map_err(|e| {
1897            Error::Internal(format!(
1898                "Error fetching layout for type {}: {e}",
1899                struct_tag.to_canonical_string(/* with_prefix */ true)
1900            ))
1901        })?;
1902
1903    let MoveTypeLayout::Struct(layout) = move_type_layout else {
1904        return Err(Error::Internal("Object is not a move struct".to_string()));
1905    };
1906
1907    // TODO (annotated-visitor): Use custom visitors for extracting a dynamic field,
1908    // and for creating a GraphQL MoveValue directly (not via an annotated
1909    // visitor).
1910    let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1911        Error::Internal(format!(
1912            "Error deserializing move struct for type {}: {e}",
1913            struct_tag.to_canonical_string(/* with_prefix */ true)
1914        ))
1915    })?;
1916
1917    Ok((struct_tag, move_struct))
1918}
1919
1920/// Constructs a backward diff query for objects.
1921///
1922/// Uses consistent view for most queries to ensure point-in-time correctness.
1923/// Falls back to historical view only for object-key lookups (specific
1924/// id+version pairs) which don't need consistency filtering. When both
1925/// `object_ids` and `object_keys` are provided, the results from both views
1926/// are unioned.
1927fn backward_objects_query(
1928    filter: &ObjectFilter,
1929    checkpoint_viewed_at: u64,
1930    page: &Page<Cursor>,
1931) -> RawQuery {
1932    if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1933        // If both object IDs and object keys are specified, then we need to query in
1934        // both historical and consistent views, and then union the results.
1935        let ids_only_filter = ObjectFilter {
1936            object_keys: None,
1937            ..filter.clone()
1938        };
1939        let (id_query, id_bindings) = consistent::query(checkpoint_viewed_at, page, move |query| {
1940            ids_only_filter.apply(query)
1941        })
1942        .finish();
1943
1944        let keys_filter: HistoricalFilter = ObjectFilter {
1945            object_ids: None,
1946            ..filter.clone()
1947        }
1948        .try_into()
1949        .expect("object_keys is Some by match-arm guard");
1950        let (key_query, key_bindings) =
1951            historical::query(checkpoint_viewed_at, page, &keys_filter).finish();
1952
1953        RawQuery::new(
1954            format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1955            id_bindings.into_iter().chain(key_bindings).collect(),
1956        )
1957        .order_by("object_id")
1958        .limit(page.limit() as i64)
1959    } else if let Ok(keys_filter) = HistoricalFilter::try_from(filter.clone()) {
1960        historical::query(checkpoint_viewed_at, page, &keys_filter)
1961    } else {
1962        consistent::query(checkpoint_viewed_at, page, move |query| filter.apply(query))
1963    }
1964}
1965
1966#[cfg(test)]
1967mod tests {
1968    use std::str::FromStr;
1969
1970    use super::*;
1971
1972    #[test]
1973    fn test_owner_filter_intersection() {
1974        let f0 = ObjectFilter {
1975            owner: Some(IotaAddress::from_str("0x1").unwrap()),
1976            ..Default::default()
1977        };
1978
1979        let f1 = ObjectFilter {
1980            owner: Some(IotaAddress::from_str("0x2").unwrap()),
1981            ..Default::default()
1982        };
1983
1984        assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1985        assert_eq!(f0.intersect(f1), None);
1986    }
1987
1988    #[test]
1989    fn test_key_filter_intersection() {
1990        let i1 = IotaAddress::from_str("0x1").unwrap();
1991        let i2 = IotaAddress::from_str("0x2").unwrap();
1992        let i3 = IotaAddress::from_str("0x3").unwrap();
1993        let i4 = IotaAddress::from_str("0x4").unwrap();
1994
1995        let f0 = ObjectFilter {
1996            object_ids: Some(vec![i1, i3]),
1997            object_keys: Some(vec![
1998                ObjectKey {
1999                    object_id: i2,
2000                    version: 1.into(),
2001                },
2002                ObjectKey {
2003                    object_id: i4,
2004                    version: 2.into(),
2005                },
2006            ]),
2007            ..Default::default()
2008        };
2009
2010        let f1 = ObjectFilter {
2011            object_ids: Some(vec![i1, i2]),
2012            object_keys: Some(vec![ObjectKey {
2013                object_id: i4,
2014                version: 2.into(),
2015            }]),
2016            ..Default::default()
2017        };
2018
2019        let f2 = ObjectFilter {
2020            object_ids: Some(vec![i1, i3]),
2021            ..Default::default()
2022        };
2023
2024        let f3 = ObjectFilter {
2025            object_keys: Some(vec![
2026                ObjectKey {
2027                    object_id: i2,
2028                    version: 2.into(),
2029                },
2030                ObjectKey {
2031                    object_id: i4,
2032                    version: 2.into(),
2033                },
2034            ]),
2035            ..Default::default()
2036        };
2037
2038        assert_eq!(
2039            f0.clone().intersect(f1.clone()),
2040            Some(ObjectFilter {
2041                object_ids: Some(vec![i1]),
2042                object_keys: Some(vec![
2043                    ObjectKey {
2044                        object_id: i2,
2045                        version: 1.into(),
2046                    },
2047                    ObjectKey {
2048                        object_id: i4,
2049                        version: 2.into(),
2050                    },
2051                ]),
2052                ..Default::default()
2053            })
2054        );
2055
2056        assert_eq!(
2057            f1.clone().intersect(f2.clone()),
2058            Some(ObjectFilter {
2059                object_ids: Some(vec![i1]),
2060                ..Default::default()
2061            })
2062        );
2063
2064        assert_eq!(
2065            f1.intersect(f3.clone()),
2066            Some(ObjectFilter {
2067                object_keys: Some(vec![
2068                    ObjectKey {
2069                        object_id: i2,
2070                        version: 2.into(),
2071                    },
2072                    ObjectKey {
2073                        object_id: i4,
2074                        version: 2.into(),
2075                    },
2076                ]),
2077                ..Default::default()
2078            })
2079        );
2080
2081        // i2 got a conflicting version assignment
2082        assert_eq!(f0.intersect(f3.clone()), None);
2083
2084        // No overlap between these two.
2085        assert_eq!(f2.intersect(f3), None);
2086    }
2087}