1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt::Write,
8};
9
10use async_graphql::{
11 connection::{Connection, CursorType, Edge},
12 dataloader::Loader,
13 *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17 models::objects::{StoredHistoryObject, StoredObject},
18 schema::{objects, objects_history, objects_version},
19 types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22 TypeTag,
23 object::{
24 MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25 bounded_visitor::BoundedVisitor,
26 },
27};
28use move_core_types::{
29 annotated_value::{MoveStruct, MoveTypeLayout},
30 language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35 config::DEFAULT_PAGE_SIZE,
36 connection::ScanConnection,
37 consistency::{Checkpointed, View, build_objects_query},
38 data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
39 error::Error,
40 filter, or_filter,
41 raw_query::RawQuery,
42 types::{
43 available_range::AvailableRange,
44 balance::{self, Balance},
45 base64::Base64,
46 big_int::BigInt,
47 coin::Coin,
48 coin_metadata::CoinMetadata,
49 cursor::{self, Page, RawPaginated, ScanLimited, Target},
50 digest::Digest,
51 display::{Display, DisplayEntry},
52 dynamic_field::{DynamicField, DynamicFieldName},
53 intersect,
54 iota_address::{IotaAddress, addr},
55 iota_names_registration::{NameFormat, NameRegistration},
56 move_object::MoveObject,
57 move_package::MovePackage,
58 owner::{Owner, OwnerImpl},
59 stake::StakedIota,
60 transaction_block,
61 transaction_block::{TransactionBlock, TransactionBlockFilter},
62 type_filter::{ExactTypeFilter, TypeFilter},
63 uint53::UInt53,
64 },
65};
66
67#[derive(Clone, Debug)]
68pub(crate) struct Object {
69 pub address: IotaAddress,
70 pub kind: ObjectKind,
71 pub checkpoint_viewed_at: u64,
73 root_version: u64,
87}
88
89pub(crate) struct ObjectImpl<'o>(pub &'o Object);
91
92#[derive(Clone, Debug)]
93#[expect(clippy::large_enum_variant)]
94pub(crate) enum ObjectKind {
95 NotIndexed(NativeObject),
98 Indexed(NativeObject, StoredHistoryObject),
100 WrappedOrDeleted(u64),
103}
104
105#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
106#[graphql(name = "ObjectKind")]
107pub enum ObjectStatus {
108 NotIndexed,
111 Indexed,
113 WrappedOrDeleted,
116}
117
118#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
119pub(crate) struct ObjectRef {
120 pub address: IotaAddress,
122 pub version: UInt53,
124 pub digest: Digest,
126}
127
128#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
136pub(crate) struct ObjectFilter {
137 pub type_: Option<TypeFilter>,
144
145 pub owner: Option<IotaAddress>,
147
148 pub object_ids: Option<Vec<IotaAddress>>,
150
151 pub object_keys: Option<Vec<ObjectKey>>,
154}
155
156#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
157pub(crate) struct ObjectKey {
158 pub object_id: IotaAddress,
159 pub version: UInt53,
160}
161
162#[derive(Union, Clone)]
164pub(crate) enum ObjectOwner {
165 Immutable(Immutable),
166 Shared(Shared),
167 Parent(Box<Parent>),
168 Address(AddressOwner),
169}
170
171#[derive(SimpleObject, Clone)]
174pub(crate) struct Immutable {
175 #[graphql(name = "_")]
176 dummy: Option<bool>,
177}
178
179#[derive(SimpleObject, Clone)]
183pub(crate) struct Shared {
184 initial_shared_version: UInt53,
185}
186
187#[derive(SimpleObject, Clone)]
192pub(crate) struct Parent {
193 parent: Option<Object>,
194}
195
196#[derive(SimpleObject, Clone)]
201pub(crate) struct AddressOwner {
202 owner: Option<Owner>,
203}
204
205pub(crate) enum ObjectLookup {
207 LatestAt {
208 checkpoint_viewed_at: u64,
210 },
211
212 UnderParent {
213 parent_version: u64,
217 checkpoint_viewed_at: u64,
219 },
220
221 VersionAt {
222 version: u64,
224 checkpoint_viewed_at: u64,
226 },
227
228 OptimisticVersion {
232 version: u64,
234 },
235}
236
237pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
238
239#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
243pub(crate) struct HistoricalObjectCursor {
244 #[serde(rename = "o")]
245 object_id: Vec<u8>,
246 #[serde(rename = "c")]
248 checkpoint_viewed_at: u64,
249}
250
251#[expect(clippy::duplicated_attributes)]
254#[derive(Interface)]
255#[graphql(
256 name = "IObject",
257 field(name = "version", ty = "UInt53"),
258 field(
259 name = "status",
260 ty = "ObjectStatus",
261 desc = r#"
262 The current status of the object as read from the off-chain store. The
263 possible states are:
264 - NOT_INDEXED: The object is loaded from serialized data, such as the
265 contents of a genesis or system package upgrade transaction.
266 - INDEXED: The object is retrieved from the off-chain index and
267 represents the most recent or historical state of the object.
268 - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
269 information can be loaded.
270 "#
271 ),
272 field(
273 name = "digest",
274 ty = "Option<String>",
275 desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
276 string."
277 ),
278 field(
279 name = "owner",
280 ty = "Option<ObjectOwner>",
281 desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
282 Immutable and Shared Objects do not have owners."
283 ),
284 field(
285 name = "previous_transaction_block",
286 ty = "Option<TransactionBlock>",
287 desc = "The transaction block that created this version of the object."
288 ),
289 field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
290 field(
291 name = "received_transaction_blocks",
292 arg(name = "first", ty = "Option<u64>"),
293 arg(name = "after", ty = "Option<transaction_block::Cursor>"),
294 arg(name = "last", ty = "Option<u64>"),
295 arg(name = "before", ty = "Option<transaction_block::Cursor>"),
296 arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
297 arg(name = "scan_limit", ty = "Option<u64>"),
298 ty = "ScanConnection<String, TransactionBlock>",
299 desc = "The transaction blocks that sent objects to this object."
300 ),
301 field(
302 name = "bcs",
303 ty = "Option<Base64>",
304 desc = "The Base64-encoded BCS serialization of the object's content."
305 )
306)]
307pub(crate) enum IObject {
308 Object(Object),
309 MovePackage(MovePackage),
310 MoveObject(MoveObject),
311 Coin(Coin),
312 CoinMetadata(CoinMetadata),
313 StakedIota(StakedIota),
314}
315
316#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
320struct HistoricalKey {
321 id: IotaAddress,
322 version: u64,
323 checkpoint_viewed_at: u64,
324}
325
326#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
330struct OptimisticKey {
331 id: IotaAddress,
332 version: u64,
333}
334
335#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
341struct ParentVersionKey {
342 id: IotaAddress,
343 parent_version: u64,
344 checkpoint_viewed_at: u64,
345}
346
347#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
350struct LatestAtKey {
351 id: IotaAddress,
352 checkpoint_viewed_at: u64,
353}
354
355#[Object]
360impl Object {
361 pub(crate) async fn address(&self) -> IotaAddress {
362 OwnerImpl::from(self).address().await
363 }
364
365 pub(crate) async fn objects(
367 &self,
368 ctx: &Context<'_>,
369 first: Option<u64>,
370 after: Option<Cursor>,
371 last: Option<u64>,
372 before: Option<Cursor>,
373 filter: Option<ObjectFilter>,
374 ) -> Result<Connection<String, MoveObject>> {
375 OwnerImpl::from(self)
376 .objects(ctx, first, after, last, before, filter)
377 .await
378 }
379
380 pub(crate) async fn balance(
383 &self,
384 ctx: &Context<'_>,
385 type_: Option<ExactTypeFilter>,
386 ) -> Result<Option<Balance>> {
387 OwnerImpl::from(self).balance(ctx, type_).await
388 }
389
390 pub(crate) async fn balances(
392 &self,
393 ctx: &Context<'_>,
394 first: Option<u64>,
395 after: Option<balance::Cursor>,
396 last: Option<u64>,
397 before: Option<balance::Cursor>,
398 ) -> Result<Connection<String, Balance>> {
399 OwnerImpl::from(self)
400 .balances(ctx, first, after, last, before)
401 .await
402 }
403
404 pub(crate) async fn coins(
409 &self,
410 ctx: &Context<'_>,
411 first: Option<u64>,
412 after: Option<Cursor>,
413 last: Option<u64>,
414 before: Option<Cursor>,
415 type_: Option<ExactTypeFilter>,
416 ) -> Result<Connection<String, Coin>> {
417 OwnerImpl::from(self)
418 .coins(ctx, first, after, last, before, type_)
419 .await
420 }
421
422 pub(crate) async fn staked_iotas(
424 &self,
425 ctx: &Context<'_>,
426 first: Option<u64>,
427 after: Option<Cursor>,
428 last: Option<u64>,
429 before: Option<Cursor>,
430 ) -> Result<Connection<String, StakedIota>> {
431 OwnerImpl::from(self)
432 .staked_iotas(ctx, first, after, last, before)
433 .await
434 }
435
436 pub(crate) async fn iota_names_default_name(
439 &self,
440 ctx: &Context<'_>,
441 format: Option<NameFormat>,
442 ) -> Result<Option<String>> {
443 OwnerImpl::from(self)
444 .iota_names_default_name(ctx, format)
445 .await
446 }
447
448 pub(crate) async fn iota_names_registrations(
451 &self,
452 ctx: &Context<'_>,
453 first: Option<u64>,
454 after: Option<Cursor>,
455 last: Option<u64>,
456 before: Option<Cursor>,
457 ) -> Result<Connection<String, NameRegistration>> {
458 OwnerImpl::from(self)
459 .iota_names_registrations(ctx, first, after, last, before)
460 .await
461 }
462
463 pub(crate) async fn version(&self) -> UInt53 {
464 ObjectImpl(self).version().await
465 }
466
467 pub(crate) async fn status(&self) -> ObjectStatus {
476 ObjectImpl(self).status().await
477 }
478
479 pub(crate) async fn digest(&self) -> Option<String> {
482 ObjectImpl(self).digest().await
483 }
484
485 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
488 ObjectImpl(self).owner(ctx).await
489 }
490
491 pub(crate) async fn previous_transaction_block(
493 &self,
494 ctx: &Context<'_>,
495 ) -> Result<Option<TransactionBlock>> {
496 ObjectImpl(self).previous_transaction_block(ctx).await
497 }
498
499 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
503 ObjectImpl(self).storage_rebate().await
504 }
505
506 #[graphql(
532 complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
533 )]
534 pub(crate) async fn received_transaction_blocks(
535 &self,
536 ctx: &Context<'_>,
537 first: Option<u64>,
538 after: Option<transaction_block::Cursor>,
539 last: Option<u64>,
540 before: Option<transaction_block::Cursor>,
541 filter: Option<TransactionBlockFilter>,
542 scan_limit: Option<u64>,
543 ) -> Result<ScanConnection<String, TransactionBlock>> {
544 ObjectImpl(self)
545 .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
546 .await
547 }
548
549 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
551 ObjectImpl(self).bcs().await
552 }
553
554 async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
558 ObjectImpl(self).display(ctx).await
559 }
560
561 async fn dynamic_field(
568 &self,
569 ctx: &Context<'_>,
570 name: DynamicFieldName,
571 ) -> Result<Option<DynamicField>> {
572 OwnerImpl::from(self)
573 .dynamic_field(ctx, name, Some(self.root_version()))
574 .await
575 }
576
577 async fn dynamic_object_field(
586 &self,
587 ctx: &Context<'_>,
588 name: DynamicFieldName,
589 ) -> Result<Option<DynamicField>> {
590 OwnerImpl::from(self)
591 .dynamic_object_field(ctx, name, Some(self.root_version()))
592 .await
593 }
594
595 async fn dynamic_fields(
600 &self,
601 ctx: &Context<'_>,
602 first: Option<u64>,
603 after: Option<Cursor>,
604 last: Option<u64>,
605 before: Option<Cursor>,
606 ) -> Result<Connection<String, DynamicField>> {
607 OwnerImpl::from(self)
608 .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
609 .await
610 }
611
612 async fn as_move_object(&self) -> Option<MoveObject> {
614 MoveObject::try_from(self).ok()
615 }
616
617 async fn as_move_package(&self) -> Option<MovePackage> {
619 MovePackage::try_from(self).ok()
620 }
621}
622
623impl ObjectImpl<'_> {
624 pub(crate) async fn version(&self) -> UInt53 {
625 self.0.version_impl().into()
626 }
627
628 pub(crate) async fn status(&self) -> ObjectStatus {
629 ObjectStatus::from(&self.0.kind)
630 }
631
632 pub(crate) async fn digest(&self) -> Option<String> {
633 self.0
634 .native_impl()
635 .map(|native| native.digest().base58_encode())
636 }
637
638 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
639 use NativeOwner as O;
640
641 let native = self.0.native_impl()?;
642
643 match native.owner {
644 O::AddressOwner(address) => {
645 let address = IotaAddress::from(address);
646 Some(ObjectOwner::Address(AddressOwner {
647 owner: Some(Owner {
648 address,
649 checkpoint_viewed_at: self.0.checkpoint_viewed_at,
650 root_version: None,
651 }),
652 }))
653 }
654 O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
655 O::ObjectOwner(address) => {
656 let parent = Object::query(
657 ctx,
658 address.into(),
659 Object::latest_at(self.0.checkpoint_viewed_at),
660 )
661 .await
662 .ok()
663 .flatten();
664
665 Some(ObjectOwner::Parent(Box::new(Parent { parent })))
666 }
667 O::Shared {
668 initial_shared_version,
669 } => Some(ObjectOwner::Shared(Shared {
670 initial_shared_version: initial_shared_version.value().into(),
671 })),
672 }
673 }
674
675 pub(crate) async fn previous_transaction_block(
676 &self,
677 ctx: &Context<'_>,
678 ) -> Result<Option<TransactionBlock>> {
679 let Some(native) = self.0.native_impl() else {
680 return Ok(None);
681 };
682 let digest = native.previous_transaction;
683 let key = transaction_block::DigestKey::new(digest.into(), self.0.checkpoint_viewed_at);
684
685 TransactionBlock::query(ctx, key.into()).await.extend()
686 }
687
688 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
689 self.0
690 .native_impl()
691 .map(|native| BigInt::from(native.storage_rebate))
692 }
693
694 pub(crate) async fn received_transaction_blocks(
695 &self,
696 ctx: &Context<'_>,
697 first: Option<u64>,
698 after: Option<transaction_block::Cursor>,
699 last: Option<u64>,
700 before: Option<transaction_block::Cursor>,
701 filter: Option<TransactionBlockFilter>,
702 scan_limit: Option<u64>,
703 ) -> Result<ScanConnection<String, TransactionBlock>> {
704 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
705
706 let Some(filter) = filter
707 .unwrap_or_default()
708 .intersect(TransactionBlockFilter {
709 recv_address: Some(self.0.address),
710 ..Default::default()
711 })
712 else {
713 return Ok(ScanConnection::new(false, false));
714 };
715
716 TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
717 .await
718 .extend()
719 }
720
721 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
722 use ObjectKind as K;
723 Ok(match &self.0.kind {
724 K::WrappedOrDeleted(_) => None,
725 K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
729
730 K::NotIndexed(native) => {
731 let bytes = bcs::to_bytes(native)
732 .map_err(|e| {
733 Error::Internal(format!(
734 "Failed to serialize object at {}: {e}",
735 self.0.address
736 ))
737 })
738 .extend()?;
739 Some(Base64::from(&bytes))
740 }
741 })
742 }
743
744 pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
747 let Some(native) = self.0.native_impl() else {
748 return Ok(None);
749 };
750
751 let move_object = native
752 .data
753 .try_as_move()
754 .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
755 .extend()?;
756
757 let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
758 .await
759 .extend()?;
760
761 let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
762 .await
763 .extend()?
764 else {
765 return Ok(None);
766 };
767
768 Ok(Some(display.render(&move_struct).extend()?))
769 }
770}
771
772impl Object {
773 pub(crate) fn from_native(
788 address: IotaAddress,
789 native: NativeObject,
790 checkpoint_viewed_at: u64,
791 root_version: Option<u64>,
792 ) -> Object {
793 let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
794 Object {
795 address,
796 kind: ObjectKind::NotIndexed(native),
797 checkpoint_viewed_at,
798 root_version,
799 }
800 }
801
802 pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
803 use ObjectKind as K;
804
805 match &self.kind {
806 K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
807 K::WrappedOrDeleted(_) => None,
808 }
809 }
810
811 pub(crate) fn version_impl(&self) -> u64 {
812 use ObjectKind as K;
813
814 match &self.kind {
815 K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
816 K::WrappedOrDeleted(object_version) => *object_version,
817 }
818 }
819
820 pub(crate) fn root_version(&self) -> u64 {
824 self.root_version
825 }
826
827 pub(crate) async fn paginate(
834 db: &Db,
835 page: Page<Cursor>,
836 filter: ObjectFilter,
837 checkpoint_viewed_at: u64,
838 ) -> Result<Connection<String, Object>, Error> {
839 Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
840 }
841
842 pub(crate) async fn paginate_subtype<T: OutputType>(
859 db: &Db,
860 page: Page<Cursor>,
861 filter: ObjectFilter,
862 checkpoint_viewed_at: u64,
863 downcast: impl Fn(Object) -> Result<T, Error>,
864 ) -> Result<Connection<String, T>, Error> {
865 let cursor_viewed_at = page.validate_cursor_consistency()?;
870 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
871
872 let Some((prev, next, results)) = db
873 .execute_repeatable(move |conn| {
874 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
875 return Ok::<_, diesel::result::Error>(None);
876 };
877
878 Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
879 conn,
880 checkpoint_viewed_at,
881 objects_query(&filter, range, &page),
882 )?))
883 })
884 .await?
885 else {
886 return Err(Error::Client(
887 "Requested data is outside the available range".to_string(),
888 ));
889 };
890
891 let mut conn: Connection<String, T> = Connection::new(prev, next);
892
893 for stored in results {
894 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
897 let object =
898 Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
899 conn.edges.push(Edge::new(cursor, downcast(object)?));
900 }
901
902 Ok(conn)
903 }
904
905 pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
907 ObjectLookup::LatestAt {
908 checkpoint_viewed_at,
909 }
910 }
911
912 pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
915 ObjectLookup::UnderParent {
916 parent_version,
917 checkpoint_viewed_at,
918 }
919 }
920
921 pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
923 ObjectLookup::VersionAt {
924 version,
925 checkpoint_viewed_at,
926 }
927 }
928
929 pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
931 ObjectLookup::OptimisticVersion { version }
932 }
933
934 pub(crate) async fn query(
935 ctx: &Context<'_>,
936 id: IotaAddress,
937 key: ObjectLookup,
938 ) -> Result<Option<Self>, Error> {
939 let DataLoader(loader) = &ctx.data_unchecked();
940
941 match key {
942 ObjectLookup::VersionAt {
943 version,
944 checkpoint_viewed_at,
945 } => {
946 loader
947 .load_one(HistoricalKey {
948 id,
949 version,
950 checkpoint_viewed_at,
951 })
952 .await
953 }
954
955 ObjectLookup::OptimisticVersion { version } => {
956 loader.load_one(OptimisticKey { id, version }).await
957 }
958
959 ObjectLookup::UnderParent {
960 parent_version,
961 checkpoint_viewed_at,
962 } => {
963 loader
964 .load_one(ParentVersionKey {
965 id,
966 parent_version,
967 checkpoint_viewed_at,
968 })
969 .await
970 }
971
972 ObjectLookup::LatestAt {
973 checkpoint_viewed_at,
974 } => {
975 loader
976 .load_one(LatestAtKey {
977 id,
978 checkpoint_viewed_at,
979 })
980 .await
981 }
982 }
983 }
984
985 pub(crate) async fn query_singleton(
989 db: &Db,
990 type_: StructTag,
991 checkpoint_viewed_at: u64,
992 ) -> Result<Option<Object>, Error> {
993 let filter = ObjectFilter {
994 type_: Some(TypeFilter::ByType(type_)),
995 ..Default::default()
996 };
997
998 let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
999
1000 Ok(connection.edges.into_iter().next().map(|edge| edge.node))
1001 }
1002
1003 pub(crate) fn try_from_stored_history_object(
1015 history_object: StoredHistoryObject,
1016 checkpoint_viewed_at: u64,
1017 root_version: Option<u64>,
1018 ) -> Result<Self, Error> {
1019 let address = addr(&history_object.object_id)?;
1020
1021 let object_status =
1022 NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1023 Error::Internal(format!(
1024 "Unknown object status {} for object {} at version {}",
1025 history_object.object_status, address, history_object.object_version
1026 ))
1027 })?;
1028
1029 match object_status {
1030 NativeObjectStatus::Active => {
1031 let Some(serialized_object) = &history_object.serialized_object else {
1032 return Err(Error::Internal(format!(
1033 "Live object {} at version {} cannot have missing serialized_object field",
1034 address, history_object.object_version
1035 )));
1036 };
1037
1038 let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1039 Error::Internal(format!("Failed to deserialize object {address}"))
1040 })?;
1041
1042 let root_version =
1043 root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1044 Ok(Self {
1045 address,
1046 kind: ObjectKind::Indexed(native_object, history_object),
1047 checkpoint_viewed_at,
1048 root_version,
1049 })
1050 }
1051 NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1052 address,
1053 kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1054 checkpoint_viewed_at,
1055 root_version: history_object.object_version as u64,
1056 }),
1057 }
1058 }
1059
1060 pub(crate) fn try_from_stored_object(
1061 stored_object: StoredObject,
1062 checkpoint_viewed_at: u64,
1063 ) -> Result<Self, Error> {
1064 let address = addr(&stored_object.object_id)?;
1065
1066 let native_object = bcs::from_bytes(&stored_object.serialized_object)
1067 .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1068
1069 let root_version = version_for_dynamic_fields(&native_object);
1070
1071 let stored_history_like = StoredHistoryObject {
1072 object_id: stored_object.object_id,
1073 object_version: stored_object.object_version,
1074 object_digest: Some(stored_object.object_digest),
1075 object_status: NativeObjectStatus::Active as i16,
1076 checkpoint_sequence_number: checkpoint_viewed_at as i64,
1077 serialized_object: Some(stored_object.serialized_object),
1078 object_type: stored_object.object_type,
1079 object_type_package: stored_object.object_type_package,
1080 object_type_module: stored_object.object_type_module,
1081 object_type_name: stored_object.object_type_name,
1082 owner_type: Some(stored_object.owner_type),
1083 owner_id: stored_object.owner_id,
1084 coin_type: stored_object.coin_type,
1085 coin_balance: stored_object.coin_balance,
1086 df_kind: stored_object.df_kind,
1087 };
1088
1089 Ok(Self {
1090 address,
1091 kind: ObjectKind::Indexed(native_object, stored_history_like),
1092 checkpoint_viewed_at,
1093 root_version,
1094 })
1095 }
1096}
1097
1098fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1109 native.as_inner().version().into()
1110}
1111
1112impl ObjectFilter {
1113 pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1119 macro_rules! intersect {
1120 ($field:ident, $body:expr) => {
1121 intersect::field(self.$field, other.$field, $body)
1122 };
1123 }
1124
1125 let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1128 let mut combined = BTreeMap::new();
1129
1130 for (id, v) in k {
1131 if let Some(w) = l.get(&id).copied() {
1132 combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1133 }
1134 }
1135
1136 (!combined.is_empty()).then_some(combined)
1140 })?;
1141
1142 let object_ids = {
1149 let partition: Vec<_> = keys
1150 .iter()
1151 .flatten()
1152 .filter_map(|(id, v)| v.is_none().then_some(*id))
1153 .collect();
1154
1155 (!partition.is_empty()).then_some(partition)
1156 };
1157
1158 let object_keys = {
1159 let partition: Vec<_> = keys
1160 .iter()
1161 .flatten()
1162 .filter_map(|(id, v)| {
1163 Some(ObjectKey {
1164 object_id: *id,
1165 version: (*v)?.into(),
1166 })
1167 })
1168 .collect();
1169
1170 (!partition.is_empty()).then_some(partition)
1171 };
1172
1173 Some(Self {
1174 type_: intersect!(type_, TypeFilter::intersect)?,
1175 owner: intersect!(owner, intersect::by_eq)?,
1176 object_ids,
1177 object_keys,
1178 })
1179 }
1180
1181 fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1185 if self.object_keys.is_none() && self.object_ids.is_none() {
1186 return None;
1187 }
1188
1189 Some(BTreeMap::from_iter(
1190 self.object_keys
1191 .iter()
1192 .flatten()
1193 .map(|key| (key.object_id, Some(key.version.into())))
1194 .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1197 ))
1198 }
1199
1200 pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1203 if let Some(object_ids) = &self.object_ids {
1206 if object_ids.is_empty() {
1208 query = or_filter!(query, "1=0");
1209 } else {
1210 let mut inner = String::new();
1211 let mut prefix = "object_id IN (";
1212 for id in object_ids {
1213 write!(
1215 &mut inner,
1216 "{prefix}'\\x{}'::bytea",
1217 hex::encode(id.into_vec())
1218 )
1219 .unwrap();
1220 prefix = ", ";
1221 }
1222 inner.push(')');
1223 query = or_filter!(query, inner);
1224 }
1225 }
1226
1227 if let Some(object_keys) = &self.object_keys {
1228 if object_keys.is_empty() {
1230 query = or_filter!(query, "1=0");
1231 } else {
1232 let mut inner = String::new();
1233 let mut prefix = "(";
1234 for ObjectKey { object_id, version } in object_keys {
1235 write!(
1237 &mut inner,
1238 "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1239 hex::encode(object_id.into_vec()),
1240 version
1241 )
1242 .unwrap();
1243 prefix = " OR ";
1244 }
1245 inner.push(')');
1246 query = or_filter!(query, inner);
1247 }
1248 }
1249
1250 if let Some(owner) = self.owner {
1251 query = filter!(
1252 query,
1253 format!(
1254 "owner_id = '\\x{}'::bytea AND owner_type = {}",
1255 hex::encode(owner.into_vec()),
1256 OwnerType::Address as i16
1257 )
1258 );
1259 }
1260
1261 if let Some(type_) = &self.type_ {
1262 return type_.apply_raw(
1263 query,
1264 "object_type",
1265 "object_type_package",
1266 "object_type_module",
1267 "object_type_name",
1268 );
1269 }
1270
1271 query
1272 }
1273
1274 pub(crate) fn has_filters(&self) -> bool {
1275 self != &Default::default()
1276 }
1277}
1278
1279impl HistoricalObjectCursor {
1280 pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1281 Self {
1282 object_id,
1283 checkpoint_viewed_at,
1284 }
1285 }
1286}
1287
1288impl Checkpointed for Cursor {
1289 fn checkpoint_viewed_at(&self) -> u64 {
1290 self.checkpoint_viewed_at
1291 }
1292}
1293
1294impl ScanLimited for Cursor {}
1295
1296impl RawPaginated<Cursor> for StoredHistoryObject {
1297 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1298 filter!(
1299 query,
1300 format!(
1301 "candidates.object_id >= '\\x{}'::bytea",
1302 hex::encode(cursor.object_id.clone())
1303 )
1304 )
1305 }
1306
1307 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1308 filter!(
1309 query,
1310 format!(
1311 "candidates.object_id <= '\\x{}'::bytea",
1312 hex::encode(cursor.object_id.clone())
1313 )
1314 )
1315 }
1316
1317 fn order(asc: bool, query: RawQuery) -> RawQuery {
1318 if asc {
1319 query.order_by("candidates.object_id ASC")
1320 } else {
1321 query.order_by("candidates.object_id DESC")
1322 }
1323 }
1324}
1325
1326impl Target<Cursor> for StoredHistoryObject {
1327 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1328 Cursor::new(HistoricalObjectCursor::new(
1329 self.object_id.clone(),
1330 checkpoint_viewed_at,
1331 ))
1332 }
1333}
1334
1335impl Loader<HistoricalKey> for Db {
1336 type Value = Object;
1337 type Error = Error;
1338
1339 async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1340 use objects_history::dsl as h;
1341 use objects_version::dsl as v;
1342
1343 if keys.is_empty() {
1344 return Ok(HashMap::new());
1345 }
1346
1347 let id_versions: BTreeSet<_> = keys
1348 .iter()
1349 .map(|key| (key.id.into_vec(), key.version as i64))
1350 .collect();
1351
1352 let objects: Vec<StoredHistoryObject> = self
1353 .execute(move |conn| {
1354 conn.results(move || {
1355 let mut query = h::objects_history
1356 .inner_join(
1357 v::objects_version.on(v::cp_sequence_number
1358 .eq(h::checkpoint_sequence_number)
1359 .and(v::object_id.eq(h::object_id))
1360 .and(v::object_version.eq(h::object_version))),
1361 )
1362 .select(StoredHistoryObject::as_select())
1363 .into_boxed();
1364
1365 for (id, version) in id_versions.iter().cloned() {
1366 query =
1367 query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1368 }
1369
1370 query
1371 })
1372 })
1373 .await
1374 .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1375
1376 let mut id_version_to_stored = BTreeMap::new();
1377 for stored in objects {
1378 let key = (addr(&stored.object_id)?, stored.object_version as u64);
1379 id_version_to_stored.insert(key, stored);
1380 }
1381
1382 let mut result = HashMap::new();
1383 for key in keys {
1384 let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1385 continue;
1386 };
1387
1388 if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1392 continue;
1393 }
1394
1395 let object = Object::try_from_stored_history_object(
1396 stored.clone(),
1397 key.checkpoint_viewed_at,
1398 None,
1400 )?;
1401 result.insert(*key, object);
1402 }
1403
1404 Ok(result)
1405 }
1406}
1407
1408impl Loader<OptimisticKey> for Db {
1409 type Value = Object;
1410 type Error = Error;
1411
1412 async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1413 use objects::dsl as o;
1414
1415 if keys.is_empty() {
1416 return Ok(HashMap::new());
1417 }
1418
1419 let id_versions: BTreeSet<_> = keys
1420 .iter()
1421 .map(|key| (key.id.into_vec(), key.version as i64))
1422 .collect();
1423
1424 let objects: Vec<StoredObject> = self
1425 .execute(move |conn| {
1426 conn.results(move || {
1427 let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1428 for (id, version) in id_versions.iter().cloned() {
1429 query =
1430 query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1431 }
1432 query
1433 })
1434 })
1435 .await
1436 .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1437
1438 let mut result = HashMap::new();
1439 let id_version_to_stored = objects
1440 .into_iter()
1441 .map(|stored| {
1442 addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1443 })
1444 .collect::<Result<BTreeMap<_, _>, _>>()?;
1445
1446 let mut missing_keys = Vec::new();
1448 for key in keys {
1449 if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1450 let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1451 result.insert(*key, object);
1452 } else {
1453 missing_keys.push(*key);
1454 }
1455 }
1456
1457 if !missing_keys.is_empty() {
1459 let historical_keys: Vec<HistoricalKey> = missing_keys
1460 .iter()
1461 .map(|key| HistoricalKey {
1462 id: key.id,
1463 version: key.version,
1464 checkpoint_viewed_at: u64::MAX,
1465 })
1466 .collect();
1467
1468 let historical_result: HashMap<HistoricalKey, Object> =
1469 self.load(&historical_keys).await?;
1470
1471 for (historical_key, object) in historical_result {
1472 let optimistic_key = OptimisticKey {
1473 id: historical_key.id,
1474 version: historical_key.version,
1475 };
1476 result.insert(optimistic_key, object);
1477 }
1478 }
1479
1480 Ok(result)
1481 }
1482}
1483
1484impl Loader<ParentVersionKey> for Db {
1485 type Value = Object;
1486 type Error = Error;
1487
1488 async fn load(
1489 &self,
1490 keys: &[ParentVersionKey],
1491 ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1492 #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1495 struct GroupKey {
1496 checkpoint_viewed_at: u64,
1497 parent_version: u64,
1498 }
1499
1500 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1501 for key in keys {
1502 let group_key = GroupKey {
1503 checkpoint_viewed_at: key.checkpoint_viewed_at,
1504 parent_version: key.parent_version,
1505 };
1506
1507 keys_by_cursor_and_parent_version
1508 .entry(group_key)
1509 .or_default()
1510 .insert(key.id.into_vec());
1511 }
1512
1513 let futures = keys_by_cursor_and_parent_version
1515 .into_iter()
1516 .map(|(group_key, ids)| {
1517 self.execute(move |conn| {
1518 let stored: Vec<StoredHistoryObject> = conn.results(move || {
1519 use objects_history::dsl as h;
1520 use objects_version::dsl as v;
1521
1522 h::objects_history
1523 .inner_join(
1524 v::objects_version.on(v::cp_sequence_number
1525 .eq(h::checkpoint_sequence_number)
1526 .and(v::object_id.eq(h::object_id))
1527 .and(v::object_version.eq(h::object_version))),
1528 )
1529 .select(StoredHistoryObject::as_select())
1530 .filter(v::object_id.eq_any(ids.iter().cloned()))
1531 .filter(v::object_version.le(group_key.parent_version as i64))
1532 .distinct_on(v::object_id)
1533 .order_by(v::object_id)
1534 .then_order_by(v::object_version.desc())
1535 .into_boxed()
1536 })?;
1537
1538 Ok::<_, diesel::result::Error>(
1539 stored
1540 .into_iter()
1541 .map(|stored| (group_key, stored))
1542 .collect::<Vec<_>>(),
1543 )
1544 })
1545 });
1546
1547 let groups = futures::future::join_all(futures).await;
1549
1550 let mut results = HashMap::new();
1551 for group in groups {
1552 for (group_key, stored) in
1553 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1554 {
1555 if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1558 continue;
1559 }
1560
1561 let object = Object::try_from_stored_history_object(
1562 stored,
1563 group_key.checkpoint_viewed_at,
1564 Some(group_key.parent_version),
1567 )?;
1568
1569 let key = ParentVersionKey {
1570 id: object.address,
1571 checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1572 parent_version: group_key.parent_version,
1573 };
1574
1575 results.insert(key, object);
1576 }
1577 }
1578
1579 Ok(results)
1580 }
1581}
1582
1583impl Loader<LatestAtKey> for Db {
1584 type Value = Object;
1585 type Error = Error;
1586
1587 async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1588 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1591
1592 for key in keys {
1593 keys_by_cursor_and_parent_version
1594 .entry(key.checkpoint_viewed_at)
1595 .or_default()
1596 .insert(key.id);
1597 }
1598
1599 let futures =
1601 keys_by_cursor_and_parent_version
1602 .into_iter()
1603 .map(|(checkpoint_viewed_at, ids)| {
1604 self.execute_repeatable(move |conn| {
1605 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1606 else {
1607 return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1608 vec![],
1609 );
1610 };
1611
1612 let filter = ObjectFilter {
1613 object_ids: Some(ids.iter().cloned().collect()),
1614 ..Default::default()
1615 };
1616
1617 Ok(conn
1618 .results(move || {
1619 build_objects_query(
1620 View::Consistent,
1621 range,
1622 &Page::bounded(ids.len() as u64),
1623 |q| filter.apply(q),
1624 |q| q,
1625 )
1626 .into_boxed()
1627 })?
1628 .into_iter()
1629 .map(|r| (checkpoint_viewed_at, r))
1630 .collect())
1631 })
1632 });
1633
1634 let groups = futures::future::join_all(futures).await;
1636
1637 let mut results = HashMap::new();
1638 for group in groups {
1639 for (checkpoint_viewed_at, stored) in
1640 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1641 {
1642 let object =
1643 Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1644
1645 let key = LatestAtKey {
1646 id: object.address,
1647 checkpoint_viewed_at,
1648 };
1649
1650 results.insert(key, object);
1651 }
1652 }
1653
1654 Ok(results)
1655 }
1656}
1657
1658impl From<&ObjectKind> for ObjectStatus {
1659 fn from(kind: &ObjectKind) -> Self {
1660 match kind {
1661 ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1662 ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1663 ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1664 }
1665 }
1666}
1667
1668impl From<&Object> for OwnerImpl {
1669 fn from(object: &Object) -> Self {
1670 OwnerImpl {
1671 address: object.address,
1672 checkpoint_viewed_at: object.checkpoint_viewed_at,
1673 }
1674 }
1675}
1676
1677pub(crate) async fn deserialize_move_struct(
1678 move_object: &NativeMoveObject,
1679 resolver: &PackageResolver,
1680) -> Result<(StructTag, MoveStruct), Error> {
1681 let struct_tag = StructTag::from(move_object.type_().clone());
1682 let contents = move_object.contents();
1683 let move_type_layout = resolver
1684 .type_layout(TypeTag::from(struct_tag.clone()))
1685 .await
1686 .map_err(|e| {
1687 Error::Internal(format!(
1688 "Error fetching layout for type {}: {e}",
1689 struct_tag.to_canonical_string(true)
1690 ))
1691 })?;
1692
1693 let MoveTypeLayout::Struct(layout) = move_type_layout else {
1694 return Err(Error::Internal("Object is not a move struct".to_string()));
1695 };
1696
1697 let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1701 Error::Internal(format!(
1702 "Error deserializing move struct for type {}: {e}",
1703 struct_tag.to_canonical_string(true)
1704 ))
1705 })?;
1706
1707 Ok((struct_tag, move_struct))
1708}
1709
1710fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1715where
1716{
1717 if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1718 let ids_only_filter = ObjectFilter {
1721 object_keys: None,
1722 ..filter.clone()
1723 };
1724 let (id_query, id_bindings) = build_objects_query(
1725 View::Consistent,
1726 range,
1727 page,
1728 move |query| ids_only_filter.apply(query),
1729 move |newer| newer,
1730 )
1731 .finish();
1732
1733 let keys_only_filter = ObjectFilter {
1734 object_ids: None,
1735 ..filter.clone()
1736 };
1737 let (key_query, key_bindings) = build_objects_query(
1738 View::Historical,
1739 range,
1740 page,
1741 move |query| keys_only_filter.apply(query),
1742 move |newer| newer,
1743 )
1744 .finish();
1745
1746 RawQuery::new(
1747 format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1748 id_bindings.into_iter().chain(key_bindings).collect(),
1749 )
1750 .order_by("object_id")
1751 .limit(page.limit() as i64)
1752 } else {
1753 let view = if filter.object_keys.is_some() || !filter.has_filters() {
1755 View::Historical
1756 } else {
1757 View::Consistent
1758 };
1759
1760 build_objects_query(
1761 view,
1762 range,
1763 page,
1764 move |query| filter.apply(query),
1765 move |newer| newer,
1766 )
1767 }
1768}
1769
1770#[cfg(test)]
1771mod tests {
1772 use std::str::FromStr;
1773
1774 use super::*;
1775
1776 #[test]
1777 fn test_owner_filter_intersection() {
1778 let f0 = ObjectFilter {
1779 owner: Some(IotaAddress::from_str("0x1").unwrap()),
1780 ..Default::default()
1781 };
1782
1783 let f1 = ObjectFilter {
1784 owner: Some(IotaAddress::from_str("0x2").unwrap()),
1785 ..Default::default()
1786 };
1787
1788 assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1789 assert_eq!(f0.clone().intersect(f1.clone()), None);
1790 }
1791
1792 #[test]
1793 fn test_key_filter_intersection() {
1794 let i1 = IotaAddress::from_str("0x1").unwrap();
1795 let i2 = IotaAddress::from_str("0x2").unwrap();
1796 let i3 = IotaAddress::from_str("0x3").unwrap();
1797 let i4 = IotaAddress::from_str("0x4").unwrap();
1798
1799 let f0 = ObjectFilter {
1800 object_ids: Some(vec![i1, i3]),
1801 object_keys: Some(vec![
1802 ObjectKey {
1803 object_id: i2,
1804 version: 1.into(),
1805 },
1806 ObjectKey {
1807 object_id: i4,
1808 version: 2.into(),
1809 },
1810 ]),
1811 ..Default::default()
1812 };
1813
1814 let f1 = ObjectFilter {
1815 object_ids: Some(vec![i1, i2]),
1816 object_keys: Some(vec![ObjectKey {
1817 object_id: i4,
1818 version: 2.into(),
1819 }]),
1820 ..Default::default()
1821 };
1822
1823 let f2 = ObjectFilter {
1824 object_ids: Some(vec![i1, i3]),
1825 ..Default::default()
1826 };
1827
1828 let f3 = ObjectFilter {
1829 object_keys: Some(vec![
1830 ObjectKey {
1831 object_id: i2,
1832 version: 2.into(),
1833 },
1834 ObjectKey {
1835 object_id: i4,
1836 version: 2.into(),
1837 },
1838 ]),
1839 ..Default::default()
1840 };
1841
1842 assert_eq!(
1843 f0.clone().intersect(f1.clone()),
1844 Some(ObjectFilter {
1845 object_ids: Some(vec![i1]),
1846 object_keys: Some(vec![
1847 ObjectKey {
1848 object_id: i2,
1849 version: 1.into(),
1850 },
1851 ObjectKey {
1852 object_id: i4,
1853 version: 2.into(),
1854 },
1855 ]),
1856 ..Default::default()
1857 })
1858 );
1859
1860 assert_eq!(
1861 f1.clone().intersect(f2.clone()),
1862 Some(ObjectFilter {
1863 object_ids: Some(vec![i1]),
1864 ..Default::default()
1865 })
1866 );
1867
1868 assert_eq!(
1869 f1.clone().intersect(f3.clone()),
1870 Some(ObjectFilter {
1871 object_keys: Some(vec![
1872 ObjectKey {
1873 object_id: i2,
1874 version: 2.into(),
1875 },
1876 ObjectKey {
1877 object_id: i4,
1878 version: 2.into(),
1879 },
1880 ]),
1881 ..Default::default()
1882 })
1883 );
1884
1885 assert_eq!(f0.clone().intersect(f3.clone()), None);
1887
1888 assert_eq!(f2.clone().intersect(f3.clone()), None);
1890 }
1891}