1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt::Write,
8};
9
10use async_graphql::{
11 connection::{Connection, CursorType, Edge},
12 dataloader::Loader,
13 *,
14};
15use diesel::{
16 BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper, sql_types,
17};
18use iota_indexer::{
19 models::objects::{StoredHistoryObject, StoredObject},
20 schema::{objects, objects_history, objects_version},
21 types::{ObjectStatus as NativeObjectStatus, OwnerType},
22};
23use iota_types::{
24 base_types::{StructTag, TypeTag},
25 object::{
26 MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
27 bounded_visitor::BoundedVisitor,
28 },
29};
30use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout};
31use serde::{Deserialize, Serialize};
32
33use crate::{
34 backward_view::{HistoricalFilter, consistent, historical},
35 config::DEFAULT_PAGE_SIZE,
36 connection::ScanConnection,
37 consistency::Checkpointed,
38 data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
39 error::Error,
40 filter, or_filter,
41 raw_query::RawQuery,
42 types::{
43 available_range::AvailableRange,
44 balance::{self, Balance},
45 base64::Base64,
46 big_int::BigInt,
47 coin::Coin,
48 coin_metadata::CoinMetadata,
49 cursor::{self, Page, RawPaginated, ScanLimited, Target},
50 digest::Digest,
51 display::{Display, DisplayEntry},
52 dynamic_field::{DynamicField, DynamicFieldName},
53 intersect,
54 iota_address::{IotaAddress, addr},
55 iota_names_registration::{NameFormat, NameRegistration},
56 move_object::MoveObject,
57 move_package::MovePackage,
58 owner::{Owner, OwnerImpl},
59 stake::StakedIota,
60 transaction_block,
61 transaction_block::{TransactionBlock, TransactionBlockFilter},
62 type_filter::{ExactTypeFilter, TypeFilter},
63 uint53::UInt53,
64 },
65};
66
67#[derive(Clone, Debug)]
68pub(crate) struct Object {
69 pub address: IotaAddress,
70 pub kind: ObjectKind,
71 pub checkpoint_viewed_at: u64,
73 root_version: u64,
87}
88
89pub(crate) struct ObjectImpl<'o>(pub &'o Object);
91
92#[derive(Clone, Debug)]
93#[expect(clippy::large_enum_variant)]
94pub(crate) enum ObjectKind {
95 NotIndexed(NativeObject),
98 Indexed(NativeObject, StoredHistoryObject),
100 WrappedOrDeleted(u64),
103}
104
105#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
106#[graphql(name = "ObjectKind")]
107pub enum ObjectStatus {
108 NotIndexed,
111 Indexed,
113 #[graphql(
116 deprecation = "will be removed with v1.26, as such objects can be considered non-existent"
117 )]
118 WrappedOrDeleted,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
122pub(crate) struct ObjectRef {
123 pub address: IotaAddress,
125 pub version: UInt53,
127 pub digest: Digest,
129}
130
131#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
139pub(crate) struct ObjectFilter {
140 pub type_: Option<TypeFilter>,
147
148 pub owner: Option<IotaAddress>,
150
151 pub object_ids: Option<Vec<IotaAddress>>,
153
154 pub object_keys: Option<Vec<ObjectKey>>,
157}
158
159#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
160pub(crate) struct ObjectKey {
161 pub object_id: IotaAddress,
162 pub version: UInt53,
163}
164
165#[derive(Union, Clone)]
167pub(crate) enum ObjectOwner {
168 Immutable(Immutable),
169 Shared(Shared),
170 Parent(Box<Parent>),
171 Address(AddressOwner),
172}
173
174#[derive(SimpleObject, Clone)]
177pub(crate) struct Immutable {
178 #[graphql(name = "_")]
179 dummy: Option<bool>,
180}
181
182#[derive(SimpleObject, Clone)]
186pub(crate) struct Shared {
187 initial_shared_version: UInt53,
188}
189
190#[derive(SimpleObject, Clone)]
195pub(crate) struct Parent {
196 parent: Option<Object>,
197}
198
199#[derive(SimpleObject, Clone)]
204pub(crate) struct AddressOwner {
205 owner: Option<Owner>,
206}
207
208pub(crate) enum ObjectLookup {
210 LatestAt {
211 checkpoint_viewed_at: u64,
213 },
214
215 UnderParent {
216 parent_version: u64,
220 checkpoint_viewed_at: u64,
222 },
223
224 VersionAt {
225 version: u64,
227 checkpoint_viewed_at: u64,
229 },
230
231 OptimisticVersion {
235 version: u64,
237 },
238}
239
240pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
241
242#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
246pub(crate) struct HistoricalObjectCursor {
247 #[serde(rename = "o")]
248 object_id: Vec<u8>,
249 #[serde(rename = "c")]
251 checkpoint_viewed_at: u64,
252}
253
254#[expect(clippy::duplicated_attributes)]
257#[derive(Interface)]
258#[graphql(
259 name = "IObject",
260 field(name = "version", ty = "UInt53"),
261 field(
262 name = "status",
263 ty = "ObjectStatus",
264 desc = r#"
265 The current status of the object as read from the off-chain store. The
266 possible states are:
267 - NOT_INDEXED: The object is loaded from serialized data, such as the
268 contents of a genesis or system package upgrade transaction.
269 - INDEXED: The object is retrieved from the off-chain index and
270 represents the most recent or historical state of the object.
271 - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
272 information can be loaded.
273 "#
274 ),
275 field(
276 name = "digest",
277 ty = "Option<String>",
278 desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
279 string."
280 ),
281 field(
282 name = "owner",
283 ty = "Option<ObjectOwner>",
284 desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
285 Immutable and Shared Objects do not have owners."
286 ),
287 field(
288 name = "previous_transaction_block",
289 ty = "Option<TransactionBlock>",
290 desc = "The transaction block that created this version of the object."
291 ),
292 field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
293 field(
294 name = "received_transaction_blocks",
295 arg(name = "first", ty = "Option<u64>"),
296 arg(name = "after", ty = "Option<transaction_block::Cursor>"),
297 arg(name = "last", ty = "Option<u64>"),
298 arg(name = "before", ty = "Option<transaction_block::Cursor>"),
299 arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
300 arg(name = "scan_limit", ty = "Option<u64>"),
301 ty = "ScanConnection<String, TransactionBlock>",
302 desc = "The transaction blocks that sent objects to this object."
303 ),
304 field(
305 name = "bcs",
306 ty = "Option<Base64>",
307 desc = "The Base64-encoded BCS serialization of the object's content."
308 )
309)]
310pub(crate) enum IObject {
311 Object(Object),
312 MovePackage(MovePackage),
313 MoveObject(MoveObject),
314 Coin(Coin),
315 CoinMetadata(CoinMetadata),
316 StakedIota(StakedIota),
317}
318
319#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
323struct HistoricalKey {
324 id: IotaAddress,
325 version: u64,
326 checkpoint_viewed_at: u64,
327}
328
329#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
333struct OptimisticKey {
334 id: IotaAddress,
335 version: u64,
336}
337
338#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
344struct ParentVersionKey {
345 id: IotaAddress,
346 parent_version: u64,
347 checkpoint_viewed_at: u64,
348}
349
350#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
353struct LatestAtKey {
354 id: IotaAddress,
355 checkpoint_viewed_at: u64,
356}
357
358#[Object]
363impl Object {
364 pub(crate) async fn address(&self) -> IotaAddress {
365 OwnerImpl::from(self).address().await
366 }
367
368 pub(crate) async fn objects(
370 &self,
371 ctx: &Context<'_>,
372 first: Option<u64>,
373 after: Option<Cursor>,
374 last: Option<u64>,
375 before: Option<Cursor>,
376 filter: Option<ObjectFilter>,
377 ) -> Result<Connection<String, MoveObject>> {
378 OwnerImpl::from(self)
379 .objects(ctx, first, after, last, before, filter)
380 .await
381 }
382
383 pub(crate) async fn balance(
386 &self,
387 ctx: &Context<'_>,
388 type_: Option<ExactTypeFilter>,
389 ) -> Result<Option<Balance>> {
390 OwnerImpl::from(self).balance(ctx, type_).await
391 }
392
393 pub(crate) async fn balances(
395 &self,
396 ctx: &Context<'_>,
397 first: Option<u64>,
398 after: Option<balance::Cursor>,
399 last: Option<u64>,
400 before: Option<balance::Cursor>,
401 ) -> Result<Connection<String, Balance>> {
402 OwnerImpl::from(self)
403 .balances(ctx, first, after, last, before)
404 .await
405 }
406
407 pub(crate) async fn coins(
412 &self,
413 ctx: &Context<'_>,
414 first: Option<u64>,
415 after: Option<Cursor>,
416 last: Option<u64>,
417 before: Option<Cursor>,
418 type_: Option<ExactTypeFilter>,
419 ) -> Result<Connection<String, Coin>> {
420 OwnerImpl::from(self)
421 .coins(ctx, first, after, last, before, type_)
422 .await
423 }
424
425 pub(crate) async fn staked_iotas(
427 &self,
428 ctx: &Context<'_>,
429 first: Option<u64>,
430 after: Option<Cursor>,
431 last: Option<u64>,
432 before: Option<Cursor>,
433 ) -> Result<Connection<String, StakedIota>> {
434 OwnerImpl::from(self)
435 .staked_iotas(ctx, first, after, last, before)
436 .await
437 }
438
439 pub(crate) async fn iota_names_default_name(
442 &self,
443 ctx: &Context<'_>,
444 format: Option<NameFormat>,
445 ) -> Result<Option<String>> {
446 OwnerImpl::from(self)
447 .iota_names_default_name(ctx, format)
448 .await
449 }
450
451 pub(crate) async fn iota_names_registrations(
454 &self,
455 ctx: &Context<'_>,
456 first: Option<u64>,
457 after: Option<Cursor>,
458 last: Option<u64>,
459 before: Option<Cursor>,
460 ) -> Result<Connection<String, NameRegistration>> {
461 OwnerImpl::from(self)
462 .iota_names_registrations(ctx, first, after, last, before)
463 .await
464 }
465
466 pub(crate) async fn version(&self) -> UInt53 {
467 ObjectImpl(self).version().await
468 }
469
470 pub(crate) async fn status(&self) -> ObjectStatus {
479 ObjectImpl(self).status().await
480 }
481
482 pub(crate) async fn digest(&self) -> Option<String> {
485 ObjectImpl(self).digest().await
486 }
487
488 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
491 ObjectImpl(self).owner(ctx).await
492 }
493
494 pub(crate) async fn previous_transaction_block(
496 &self,
497 ctx: &Context<'_>,
498 ) -> Result<Option<TransactionBlock>> {
499 ObjectImpl(self).previous_transaction_block(ctx).await
500 }
501
502 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
506 ObjectImpl(self).storage_rebate().await
507 }
508
509 #[graphql(
535 complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
536 )]
537 pub(crate) async fn received_transaction_blocks(
538 &self,
539 ctx: &Context<'_>,
540 first: Option<u64>,
541 after: Option<transaction_block::Cursor>,
542 last: Option<u64>,
543 before: Option<transaction_block::Cursor>,
544 filter: Option<TransactionBlockFilter>,
545 scan_limit: Option<u64>,
546 ) -> Result<ScanConnection<String, TransactionBlock>> {
547 ObjectImpl(self)
548 .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
549 .await
550 }
551
552 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
554 ObjectImpl(self).bcs().await
555 }
556
557 async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
561 ObjectImpl(self).display(ctx).await
562 }
563
564 async fn dynamic_field(
571 &self,
572 ctx: &Context<'_>,
573 name: DynamicFieldName,
574 ) -> Result<Option<DynamicField>> {
575 OwnerImpl::from(self)
576 .dynamic_field(ctx, name, Some(self.root_version()))
577 .await
578 }
579
580 async fn dynamic_object_field(
589 &self,
590 ctx: &Context<'_>,
591 name: DynamicFieldName,
592 ) -> Result<Option<DynamicField>> {
593 OwnerImpl::from(self)
594 .dynamic_object_field(ctx, name, Some(self.root_version()))
595 .await
596 }
597
598 async fn dynamic_fields(
603 &self,
604 ctx: &Context<'_>,
605 first: Option<u64>,
606 after: Option<Cursor>,
607 last: Option<u64>,
608 before: Option<Cursor>,
609 ) -> Result<Connection<String, DynamicField>> {
610 OwnerImpl::from(self)
611 .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
612 .await
613 }
614
615 async fn as_move_object(&self) -> Option<MoveObject> {
617 MoveObject::try_from(self).ok()
618 }
619
620 async fn as_move_package(&self) -> Option<MovePackage> {
622 MovePackage::try_from(self).ok()
623 }
624}
625
626impl ObjectImpl<'_> {
627 pub(crate) async fn version(&self) -> UInt53 {
628 self.0.version_impl().into()
629 }
630
631 pub(crate) async fn status(&self) -> ObjectStatus {
632 ObjectStatus::from(&self.0.kind)
633 }
634
635 pub(crate) async fn digest(&self) -> Option<String> {
636 self.0
637 .native_impl()
638 .map(|native| native.digest().to_base58())
639 }
640
641 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
642 use NativeOwner as O;
643
644 let native = self.0.native_impl()?;
645
646 match native.owner {
647 O::Address(address) => {
648 let address = IotaAddress::from(address);
649 Some(ObjectOwner::Address(AddressOwner {
650 owner: Some(Owner {
651 address,
652 checkpoint_viewed_at: self.0.checkpoint_viewed_at,
653 root_version: None,
654 }),
655 }))
656 }
657 O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
658 O::Object(address) => {
659 let parent = Object::query(
660 ctx,
661 address.into(),
662 Object::latest_at(self.0.checkpoint_viewed_at),
663 )
664 .await
665 .ok()
666 .flatten();
667
668 Some(ObjectOwner::Parent(Box::new(Parent { parent })))
669 }
670 O::Shared(initial_shared_version) => Some(ObjectOwner::Shared(Shared {
671 initial_shared_version: initial_shared_version.as_u64().into(),
672 })),
673 _ => unimplemented!("a new Owner enum variant was added and needs to be handled"),
674 }
675 }
676
677 pub(crate) async fn previous_transaction_block(
678 &self,
679 ctx: &Context<'_>,
680 ) -> Result<Option<TransactionBlock>> {
681 let Some(native) = self.0.native_impl() else {
682 return Ok(None);
683 };
684 let digest = native.previous_transaction;
685 let key = transaction_block::DigestKey::new(digest.into(), self.0.checkpoint_viewed_at);
686
687 TransactionBlock::query(ctx, key.into()).await.extend()
688 }
689
690 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
691 self.0
692 .native_impl()
693 .map(|native| BigInt::from(native.storage_rebate))
694 }
695
696 pub(crate) async fn received_transaction_blocks(
697 &self,
698 ctx: &Context<'_>,
699 first: Option<u64>,
700 after: Option<transaction_block::Cursor>,
701 last: Option<u64>,
702 before: Option<transaction_block::Cursor>,
703 filter: Option<TransactionBlockFilter>,
704 scan_limit: Option<u64>,
705 ) -> Result<ScanConnection<String, TransactionBlock>> {
706 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
707
708 let Some(filter) = filter
709 .unwrap_or_default()
710 .intersect(TransactionBlockFilter {
711 recv_address: Some(self.0.address),
712 ..Default::default()
713 })
714 else {
715 return Ok(ScanConnection::new(false, false));
716 };
717
718 TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
719 .await
720 .extend()
721 }
722
723 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
724 use ObjectKind as K;
725 Ok(match &self.0.kind {
726 K::WrappedOrDeleted(_) => None,
727 K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
731
732 K::NotIndexed(native) => {
733 let bytes = bcs::to_bytes(native)
734 .map_err(|e| {
735 Error::Internal(format!(
736 "Failed to serialize object at {}: {e}",
737 self.0.address
738 ))
739 })
740 .extend()?;
741 Some(Base64::from(&bytes))
742 }
743 })
744 }
745
746 pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
749 let Some(native) = self.0.native_impl() else {
750 return Ok(None);
751 };
752
753 let move_object = native
754 .data
755 .as_struct_opt()
756 .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
757 .extend()?;
758
759 let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
760 .await
761 .extend()?;
762
763 let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
764 .await
765 .extend()?
766 else {
767 return Ok(None);
768 };
769
770 Ok(Some(display.render(&move_struct).extend()?))
771 }
772}
773
774impl Object {
775 pub(crate) fn from_native(
790 address: IotaAddress,
791 native: NativeObject,
792 checkpoint_viewed_at: u64,
793 root_version: Option<u64>,
794 ) -> Object {
795 let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
796 Object {
797 address,
798 kind: ObjectKind::NotIndexed(native),
799 checkpoint_viewed_at,
800 root_version,
801 }
802 }
803
804 pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
805 use ObjectKind as K;
806
807 match &self.kind {
808 K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
809 K::WrappedOrDeleted(_) => None,
810 }
811 }
812
813 pub(crate) fn version_impl(&self) -> u64 {
814 use ObjectKind as K;
815
816 match &self.kind {
817 K::NotIndexed(native) | K::Indexed(native, _) => native.version().as_u64(),
818 K::WrappedOrDeleted(object_version) => *object_version,
819 }
820 }
821
822 pub(crate) fn root_version(&self) -> u64 {
826 self.root_version
827 }
828
829 pub(crate) async fn paginate(
836 db: &Db,
837 page: Page<Cursor>,
838 filter: ObjectFilter,
839 checkpoint_viewed_at: u64,
840 ) -> Result<Connection<String, Object>, Error> {
841 Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
842 }
843
844 pub(crate) async fn paginate_subtype<T: OutputType>(
861 db: &Db,
862 page: Page<Cursor>,
863 filter: ObjectFilter,
864 checkpoint_viewed_at: u64,
865 downcast: impl Fn(Object) -> Result<T, Error>,
866 ) -> Result<Connection<String, T>, Error> {
867 let cursor_viewed_at = page.validate_cursor_consistency()?;
872 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
873
874 let max_available_range = db.max_available_range;
875
876 let Some((prev, next, results)) = db
877 .execute_repeatable(move |conn| {
878 if !AvailableRange::is_checkpoint_in_backward_history_range(
879 conn,
880 checkpoint_viewed_at,
881 max_available_range,
882 )? {
883 return Ok::<_, diesel::result::Error>(None);
884 };
885
886 let (prev, next, results_iter) = page.paginate_raw_query::<StoredBackwardObject>(
887 conn,
888 checkpoint_viewed_at,
889 backward_objects_query(&filter, checkpoint_viewed_at, &page),
890 )?;
891 let results = results_iter.collect();
892 let results = resolve_tombstone_versions(conn, results)?;
893 Ok(Some((prev, next, results)))
894 })
895 .await?
896 else {
897 return Err(Error::Client(
898 "Requested data is outside the available range".to_string(),
899 ));
900 };
901
902 let mut conn: Connection<String, T> = Connection::new(prev, next);
903
904 for stored in results {
905 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
908 let stored_history = stored.into_stored_history(checkpoint_viewed_at);
909 let object =
910 Object::try_from_stored_history_object(stored_history, checkpoint_viewed_at, None)?;
911 conn.edges.push(Edge::new(cursor, downcast(object)?));
912 }
913
914 Ok(conn)
915 }
916
917 pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
919 ObjectLookup::LatestAt {
920 checkpoint_viewed_at,
921 }
922 }
923
924 pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
927 ObjectLookup::UnderParent {
928 parent_version,
929 checkpoint_viewed_at,
930 }
931 }
932
933 pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
935 ObjectLookup::VersionAt {
936 version,
937 checkpoint_viewed_at,
938 }
939 }
940
941 pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
943 ObjectLookup::OptimisticVersion { version }
944 }
945
946 pub(crate) async fn query(
947 ctx: &Context<'_>,
948 id: IotaAddress,
949 key: ObjectLookup,
950 ) -> Result<Option<Self>, Error> {
951 let DataLoader(loader) = &ctx.data_unchecked();
952
953 match key {
954 ObjectLookup::VersionAt {
955 version,
956 checkpoint_viewed_at,
957 } => {
958 loader
959 .load_one(HistoricalKey {
960 id,
961 version,
962 checkpoint_viewed_at,
963 })
964 .await
965 }
966
967 ObjectLookup::OptimisticVersion { version } => {
968 loader.load_one(OptimisticKey { id, version }).await
969 }
970
971 ObjectLookup::UnderParent {
972 parent_version,
973 checkpoint_viewed_at,
974 } => {
975 loader
976 .load_one(ParentVersionKey {
977 id,
978 parent_version,
979 checkpoint_viewed_at,
980 })
981 .await
982 }
983
984 ObjectLookup::LatestAt {
985 checkpoint_viewed_at,
986 } => {
987 loader
988 .load_one(LatestAtKey {
989 id,
990 checkpoint_viewed_at,
991 })
992 .await
993 }
994 }
995 }
996
997 pub(crate) async fn query_singleton(
1001 db: &Db,
1002 type_: StructTag,
1003 checkpoint_viewed_at: u64,
1004 ) -> Result<Option<Object>, Error> {
1005 let filter = ObjectFilter {
1006 type_: Some(TypeFilter::ByType(type_)),
1007 ..Default::default()
1008 };
1009
1010 let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
1011
1012 Ok(connection.edges.into_iter().next().map(|edge| edge.node))
1013 }
1014
1015 pub(crate) fn try_from_stored_history_object(
1027 history_object: StoredHistoryObject,
1028 checkpoint_viewed_at: u64,
1029 root_version: Option<u64>,
1030 ) -> Result<Self, Error> {
1031 let address = addr(&history_object.object_id)?;
1032
1033 let object_status =
1034 NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1035 Error::Internal(format!(
1036 "Unknown object status {} for object {} at version {}",
1037 history_object.object_status, address, history_object.object_version
1038 ))
1039 })?;
1040
1041 match object_status {
1042 NativeObjectStatus::Active => {
1043 let Some(serialized_object) = &history_object.serialized_object else {
1044 return Err(Error::Internal(format!(
1045 "Live object {} at version {} cannot have missing serialized_object field",
1046 address, history_object.object_version
1047 )));
1048 };
1049
1050 let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1051 Error::Internal(format!("Failed to deserialize object {address}"))
1052 })?;
1053
1054 let root_version =
1055 root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1056 Ok(Self {
1057 address,
1058 kind: ObjectKind::Indexed(native_object, history_object),
1059 checkpoint_viewed_at,
1060 root_version,
1061 })
1062 }
1063 NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1064 address,
1065 kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1066 checkpoint_viewed_at,
1067 root_version: history_object.object_version as u64,
1068 }),
1069 }
1070 }
1071
1072 pub(crate) fn try_from_stored_object(
1073 stored_object: StoredObject,
1074 checkpoint_viewed_at: u64,
1075 ) -> Result<Self, Error> {
1076 let address = addr(&stored_object.object_id)?;
1077
1078 let native_object = bcs::from_bytes(&stored_object.serialized_object)
1079 .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1080
1081 let root_version = version_for_dynamic_fields(&native_object);
1082
1083 let stored_history_like = StoredHistoryObject {
1084 object_id: stored_object.object_id,
1085 object_version: stored_object.object_version,
1086 object_digest: Some(stored_object.object_digest),
1087 object_status: NativeObjectStatus::Active as i16,
1088 checkpoint_sequence_number: checkpoint_viewed_at as i64,
1089 serialized_object: Some(stored_object.serialized_object),
1090 object_type: stored_object.object_type,
1091 object_type_package: stored_object.object_type_package,
1092 object_type_module: stored_object.object_type_module,
1093 object_type_name: stored_object.object_type_name,
1094 owner_type: Some(stored_object.owner_type),
1095 owner_id: stored_object.owner_id,
1096 coin_type: stored_object.coin_type,
1097 coin_balance: stored_object.coin_balance,
1098 df_kind: stored_object.df_kind,
1099 };
1100
1101 Ok(Self {
1102 address,
1103 kind: ObjectKind::Indexed(native_object, stored_history_like),
1104 checkpoint_viewed_at,
1105 root_version,
1106 })
1107 }
1108}
1109
1110fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1121 native.as_inner().version().as_u64()
1122}
1123
1124impl ObjectFilter {
1125 pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1131 macro_rules! intersect {
1132 ($field:ident, $body:expr) => {
1133 intersect::field(self.$field, other.$field, $body)
1134 };
1135 }
1136
1137 let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1140 let mut combined = BTreeMap::new();
1141
1142 for (id, v) in k {
1143 if let Some(w) = l.get(&id).copied() {
1144 combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1145 }
1146 }
1147
1148 (!combined.is_empty()).then_some(combined)
1152 })?;
1153
1154 let object_ids = {
1161 let partition: Vec<_> = keys
1162 .iter()
1163 .flatten()
1164 .filter_map(|(id, v)| v.is_none().then_some(*id))
1165 .collect();
1166
1167 (!partition.is_empty()).then_some(partition)
1168 };
1169
1170 let object_keys = {
1171 let partition: Vec<_> = keys
1172 .iter()
1173 .flatten()
1174 .filter_map(|(id, v)| {
1175 Some(ObjectKey {
1176 object_id: *id,
1177 version: (*v)?.into(),
1178 })
1179 })
1180 .collect();
1181
1182 (!partition.is_empty()).then_some(partition)
1183 };
1184
1185 Some(Self {
1186 type_: intersect!(type_, TypeFilter::intersect)?,
1187 owner: intersect!(owner, intersect::by_eq)?,
1188 object_ids,
1189 object_keys,
1190 })
1191 }
1192
1193 fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1197 if self.object_keys.is_none() && self.object_ids.is_none() {
1198 return None;
1199 }
1200
1201 Some(BTreeMap::from_iter(
1202 self.object_keys
1203 .iter()
1204 .flatten()
1205 .map(|key| (key.object_id, Some(key.version.into())))
1206 .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1209 ))
1210 }
1211
1212 pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1215 if let Some(object_ids) = &self.object_ids {
1218 if object_ids.is_empty() {
1220 query = or_filter!(query, "1=0");
1221 } else {
1222 let mut inner = String::new();
1223 let mut prefix = "object_id IN (";
1224 for id in object_ids {
1225 write!(
1227 &mut inner,
1228 "{prefix}'\\x{}'::bytea",
1229 hex::encode(id.into_vec())
1230 )
1231 .unwrap();
1232 prefix = ", ";
1233 }
1234 inner.push(')');
1235 query = or_filter!(query, inner);
1236 }
1237 }
1238
1239 if let Some(object_keys) = &self.object_keys {
1240 if object_keys.is_empty() {
1242 query = or_filter!(query, "1=0");
1243 } else {
1244 let mut inner = String::new();
1245 let mut prefix = "(";
1246 for ObjectKey { object_id, version } in object_keys {
1247 write!(
1249 &mut inner,
1250 "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1251 hex::encode(object_id.into_vec()),
1252 version
1253 )
1254 .unwrap();
1255 prefix = " OR ";
1256 }
1257 inner.push(')');
1258 query = or_filter!(query, inner);
1259 }
1260 }
1261
1262 if let Some(owner) = self.owner {
1263 query = filter!(
1264 query,
1265 format!(
1266 "owner_id = '\\x{}'::bytea AND owner_type = {}",
1267 hex::encode(owner.into_vec()),
1268 OwnerType::Address as i16
1269 )
1270 );
1271 }
1272
1273 if let Some(type_) = &self.type_ {
1274 return type_.apply_raw(
1275 query,
1276 "object_type",
1277 "object_type_package",
1278 "object_type_module",
1279 "object_type_name",
1280 );
1281 }
1282
1283 query
1284 }
1285}
1286
1287impl HistoricalObjectCursor {
1288 pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1289 Self {
1290 object_id,
1291 checkpoint_viewed_at,
1292 }
1293 }
1294}
1295
1296impl Checkpointed for Cursor {
1297 fn checkpoint_viewed_at(&self) -> u64 {
1298 self.checkpoint_viewed_at
1299 }
1300}
1301
1302impl ScanLimited for Cursor {}
1303
1304impl RawPaginated<Cursor> for StoredHistoryObject {
1305 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1306 filter!(
1307 query,
1308 format!(
1309 "candidates.object_id >= '\\x{}'::bytea",
1310 hex::encode(cursor.object_id.clone())
1311 )
1312 )
1313 }
1314
1315 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1316 filter!(
1317 query,
1318 format!(
1319 "candidates.object_id <= '\\x{}'::bytea",
1320 hex::encode(cursor.object_id.clone())
1321 )
1322 )
1323 }
1324
1325 fn order(asc: bool, query: RawQuery) -> RawQuery {
1326 if asc {
1327 query.order_by("candidates.object_id ASC")
1328 } else {
1329 query.order_by("candidates.object_id DESC")
1330 }
1331 }
1332}
1333
1334impl Target<Cursor> for StoredHistoryObject {
1335 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1336 Cursor::new(HistoricalObjectCursor::new(
1337 self.object_id.clone(),
1338 checkpoint_viewed_at,
1339 ))
1340 }
1341}
1342
1343#[derive(diesel::QueryableByName, Clone, Debug)]
1348pub(crate) struct StoredBackwardObject {
1349 #[diesel(sql_type = sql_types::Binary)]
1350 pub object_id: Vec<u8>,
1351 #[diesel(sql_type = sql_types::BigInt)]
1352 pub object_version: i64,
1353 #[diesel(sql_type = sql_types::SmallInt)]
1354 pub object_status: i16,
1355 #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1356 pub object_digest: Option<Vec<u8>>,
1357 #[diesel(sql_type = sql_types::Nullable<sql_types::SmallInt>)]
1358 pub owner_type: Option<i16>,
1359 #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1360 pub owner_id: Option<Vec<u8>>,
1361 #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1362 pub object_type: Option<String>,
1363 #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1364 pub object_type_package: Option<Vec<u8>>,
1365 #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1366 pub object_type_module: Option<String>,
1367 #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1368 pub object_type_name: Option<String>,
1369 #[diesel(sql_type = sql_types::Nullable<sql_types::Binary>)]
1370 pub serialized_object: Option<Vec<u8>>,
1371 #[diesel(sql_type = sql_types::Nullable<sql_types::Text>)]
1372 pub coin_type: Option<String>,
1373 #[diesel(sql_type = sql_types::Nullable<sql_types::BigInt>)]
1374 pub coin_balance: Option<i64>,
1375 #[diesel(sql_type = sql_types::Nullable<sql_types::SmallInt>)]
1376 pub df_kind: Option<i16>,
1377 #[diesel(sql_type = sql_types::Bool)]
1382 pub from_backward_history: bool,
1383}
1384
1385impl StoredBackwardObject {
1386 pub(crate) fn into_stored_history(self, checkpoint_viewed_at: u64) -> StoredHistoryObject {
1391 StoredHistoryObject {
1392 object_id: self.object_id,
1393 object_version: self.object_version,
1394 object_status: self.object_status,
1395 object_digest: self.object_digest,
1396 checkpoint_sequence_number: checkpoint_viewed_at as i64,
1397 owner_type: self.owner_type,
1398 owner_id: self.owner_id,
1399 object_type: self.object_type,
1400 object_type_package: self.object_type_package,
1401 object_type_module: self.object_type_module,
1402 object_type_name: self.object_type_name,
1403 serialized_object: self.serialized_object,
1404 coin_type: self.coin_type,
1405 coin_balance: self.coin_balance,
1406 df_kind: self.df_kind,
1407 }
1408 }
1409}
1410
1411pub(crate) fn resolve_tombstone_versions(
1422 conn: &mut crate::data::pg::PgConnection<'_>,
1423 results: Vec<StoredBackwardObject>,
1424) -> Result<Vec<StoredBackwardObject>, diesel::result::Error> {
1425 let (ids, versions): (Vec<Vec<u8>>, Vec<i64>) = results
1426 .iter()
1427 .filter(|r| {
1428 r.from_backward_history
1429 && r.object_status == NativeObjectStatus::WrappedOrDeleted as i16
1430 })
1431 .map(|r| (r.object_id.clone(), r.object_version))
1432 .unzip();
1433
1434 if ids.is_empty() {
1435 return Ok(results);
1436 }
1437
1438 let sql = "SELECT pairs.object_id, pairs.backward_history_version, \
1442 MAX(ov.object_version) AS real_version \
1443 FROM unnest($1::bytea[], $2::bigint[]) \
1444 AS pairs(object_id, backward_history_version) \
1445 LEFT JOIN objects_version ov \
1446 ON ov.object_id = pairs.object_id \
1447 AND ov.object_version <= pairs.backward_history_version \
1448 GROUP BY pairs.object_id, pairs.backward_history_version";
1449
1450 #[derive(diesel::QueryableByName)]
1451 struct ResolvedVersion {
1452 #[diesel(sql_type = sql_types::Binary)]
1453 object_id: Vec<u8>,
1454 #[diesel(sql_type = sql_types::BigInt)]
1455 backward_history_version: i64,
1456 #[diesel(sql_type = sql_types::Nullable<sql_types::BigInt>)]
1457 real_version: Option<i64>,
1458 }
1459
1460 let rows: Vec<ResolvedVersion> = conn.results(|| {
1461 diesel::sql_query(sql)
1462 .bind::<sql_types::Array<sql_types::Binary>, _>(ids.clone())
1463 .bind::<sql_types::Array<sql_types::BigInt>, _>(versions.clone())
1464 })?;
1465
1466 let resolved_map: HashMap<Vec<u8>, HashMap<i64, i64>> = rows
1468 .into_iter()
1469 .filter_map(|r| {
1470 r.real_version
1471 .map(|real| (r.object_id, r.backward_history_version, real))
1472 })
1473 .fold(HashMap::new(), |mut acc, (id, ver, real)| {
1474 acc.entry(id).or_default().insert(ver, real);
1475 acc
1476 });
1477
1478 Ok(results
1479 .into_iter()
1480 .map(|mut r| {
1481 if r.from_backward_history
1482 && r.object_status == NativeObjectStatus::WrappedOrDeleted as i16
1483 {
1484 if let Some(&real_version) = resolved_map
1485 .get(&r.object_id)
1486 .and_then(|versions| versions.get(&r.object_version))
1487 {
1488 r.object_version = real_version;
1489 }
1490 }
1491 r
1492 })
1493 .collect())
1494}
1495
1496impl RawPaginated<Cursor> for StoredBackwardObject {
1497 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1498 filter!(
1499 query,
1500 format!(
1501 "candidates.object_id >= '\\x{}'::bytea",
1502 hex::encode(cursor.object_id.clone())
1503 )
1504 )
1505 }
1506
1507 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1508 filter!(
1509 query,
1510 format!(
1511 "candidates.object_id <= '\\x{}'::bytea",
1512 hex::encode(cursor.object_id.clone())
1513 )
1514 )
1515 }
1516
1517 fn order(asc: bool, query: RawQuery) -> RawQuery {
1518 if asc {
1519 query.order_by("candidates.object_id ASC")
1520 } else {
1521 query.order_by("candidates.object_id DESC")
1522 }
1523 }
1524}
1525
1526impl Target<Cursor> for StoredBackwardObject {
1527 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1528 Cursor::new(HistoricalObjectCursor::new(
1529 self.object_id.clone(),
1530 checkpoint_viewed_at,
1531 ))
1532 }
1533}
1534
1535impl Loader<HistoricalKey> for Db {
1536 type Value = Object;
1537 type Error = Error;
1538
1539 async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1540 use objects_history::dsl as h;
1541 use objects_version::dsl as v;
1542
1543 if keys.is_empty() {
1544 return Ok(HashMap::new());
1545 }
1546
1547 let id_versions: BTreeSet<_> = keys
1548 .iter()
1549 .map(|key| (key.id.into_vec(), key.version as i64))
1550 .collect();
1551
1552 let objects: Vec<StoredHistoryObject> = self
1553 .execute(move |conn| {
1554 conn.results(move || {
1555 let mut query = h::objects_history
1556 .inner_join(
1557 v::objects_version.on(v::cp_sequence_number
1558 .eq(h::checkpoint_sequence_number)
1559 .and(v::object_id.eq(h::object_id))
1560 .and(v::object_version.eq(h::object_version))),
1561 )
1562 .select(StoredHistoryObject::as_select())
1563 .into_boxed();
1564
1565 for (id, version) in id_versions.iter().cloned() {
1566 query =
1567 query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1568 }
1569
1570 query
1571 })
1572 })
1573 .await
1574 .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1575
1576 let mut id_version_to_stored = BTreeMap::new();
1577 for stored in objects {
1578 let key = (addr(&stored.object_id)?, stored.object_version as u64);
1579 id_version_to_stored.insert(key, stored);
1580 }
1581
1582 let mut result = HashMap::new();
1583 for key in keys {
1584 let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1585 continue;
1586 };
1587
1588 if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1592 continue;
1593 }
1594
1595 let object = Object::try_from_stored_history_object(
1596 stored.clone(),
1597 key.checkpoint_viewed_at,
1598 None,
1600 )?;
1601 result.insert(*key, object);
1602 }
1603
1604 Ok(result)
1605 }
1606}
1607
1608impl Loader<OptimisticKey> for Db {
1609 type Value = Object;
1610 type Error = Error;
1611
1612 async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1613 use objects::dsl as o;
1614
1615 if keys.is_empty() {
1616 return Ok(HashMap::new());
1617 }
1618
1619 let id_versions: BTreeSet<_> = keys
1620 .iter()
1621 .map(|key| (key.id.into_vec(), key.version as i64))
1622 .collect();
1623
1624 let objects: Vec<StoredObject> = self
1625 .execute(move |conn| {
1626 conn.results(move || {
1627 let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1628 for (id, version) in id_versions.iter().cloned() {
1629 query =
1630 query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1631 }
1632 query
1633 })
1634 })
1635 .await
1636 .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1637
1638 let mut result = HashMap::new();
1639 let id_version_to_stored = objects
1640 .into_iter()
1641 .map(|stored| {
1642 addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1643 })
1644 .collect::<Result<BTreeMap<_, _>, _>>()?;
1645
1646 let mut missing_keys = Vec::new();
1648 for key in keys {
1649 if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1650 let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1651 result.insert(*key, object);
1652 } else {
1653 missing_keys.push(*key);
1654 }
1655 }
1656
1657 if !missing_keys.is_empty() {
1659 let historical_keys: Vec<HistoricalKey> = missing_keys
1660 .iter()
1661 .map(|key| HistoricalKey {
1662 id: key.id,
1663 version: key.version,
1664 checkpoint_viewed_at: u64::MAX,
1665 })
1666 .collect();
1667
1668 let historical_result: HashMap<HistoricalKey, Object> =
1669 self.load(&historical_keys).await?;
1670
1671 for (historical_key, object) in historical_result {
1672 let optimistic_key = OptimisticKey {
1673 id: historical_key.id,
1674 version: historical_key.version,
1675 };
1676 result.insert(optimistic_key, object);
1677 }
1678 }
1679
1680 Ok(result)
1681 }
1682}
1683
1684impl Loader<ParentVersionKey> for Db {
1685 type Value = Object;
1686 type Error = Error;
1687
1688 async fn load(
1689 &self,
1690 keys: &[ParentVersionKey],
1691 ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1692 #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1695 struct GroupKey {
1696 checkpoint_viewed_at: u64,
1697 parent_version: u64,
1698 }
1699
1700 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1701 for key in keys {
1702 let group_key = GroupKey {
1703 checkpoint_viewed_at: key.checkpoint_viewed_at,
1704 parent_version: key.parent_version,
1705 };
1706
1707 keys_by_cursor_and_parent_version
1708 .entry(group_key)
1709 .or_default()
1710 .insert(key.id.into_vec());
1711 }
1712
1713 let futures = keys_by_cursor_and_parent_version
1715 .into_iter()
1716 .map(|(group_key, ids)| {
1717 self.execute(move |conn| {
1718 let stored: Vec<StoredHistoryObject> = conn.results(move || {
1719 use objects_history::dsl as h;
1720 use objects_version::dsl as v;
1721
1722 h::objects_history
1723 .inner_join(
1724 v::objects_version.on(v::cp_sequence_number
1725 .eq(h::checkpoint_sequence_number)
1726 .and(v::object_id.eq(h::object_id))
1727 .and(v::object_version.eq(h::object_version))),
1728 )
1729 .select(StoredHistoryObject::as_select())
1730 .filter(v::object_id.eq_any(ids.iter().cloned()))
1731 .filter(v::object_version.le(group_key.parent_version as i64))
1732 .distinct_on(v::object_id)
1733 .order_by(v::object_id)
1734 .then_order_by(v::object_version.desc())
1735 .into_boxed()
1736 })?;
1737
1738 Ok::<_, diesel::result::Error>(
1739 stored
1740 .into_iter()
1741 .map(|stored| (group_key, stored))
1742 .collect::<Vec<_>>(),
1743 )
1744 })
1745 });
1746
1747 let groups = futures::future::join_all(futures).await;
1749
1750 let mut results = HashMap::new();
1751 for group in groups {
1752 for (group_key, stored) in
1753 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1754 {
1755 if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1758 continue;
1759 }
1760
1761 let object = Object::try_from_stored_history_object(
1762 stored,
1763 group_key.checkpoint_viewed_at,
1764 Some(group_key.parent_version),
1767 )?;
1768
1769 let key = ParentVersionKey {
1770 id: object.address,
1771 checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1772 parent_version: group_key.parent_version,
1773 };
1774
1775 results.insert(key, object);
1776 }
1777 }
1778
1779 Ok(results)
1780 }
1781}
1782
1783impl Loader<LatestAtKey> for Db {
1784 type Value = Object;
1785 type Error = Error;
1786
1787 async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1788 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1791
1792 for key in keys {
1793 keys_by_cursor_and_parent_version
1794 .entry(key.checkpoint_viewed_at)
1795 .or_default()
1796 .insert(key.id);
1797 }
1798
1799 let max_available_range = self.max_available_range;
1800
1801 let futures =
1803 keys_by_cursor_and_parent_version
1804 .into_iter()
1805 .map(|(checkpoint_viewed_at, ids)| {
1806 self.execute_repeatable(move |conn| {
1807 if !AvailableRange::is_checkpoint_in_backward_history_range(
1808 conn,
1809 checkpoint_viewed_at,
1810 max_available_range,
1811 )? {
1812 return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1813 vec![],
1814 );
1815 };
1816
1817 let filter = ObjectFilter {
1818 object_ids: Some(ids.iter().cloned().collect()),
1819 ..Default::default()
1820 };
1821
1822 let results: Vec<StoredBackwardObject> = conn.results(move || {
1823 consistent::query(
1824 checkpoint_viewed_at,
1825 &Page::bounded(ids.len() as u64),
1826 |q| filter.apply(q),
1827 )
1828 .into_boxed()
1829 })?;
1830 let results = resolve_tombstone_versions(conn, results)?;
1831
1832 Ok(results
1833 .into_iter()
1834 .map(|r| {
1835 (
1836 checkpoint_viewed_at,
1837 r.into_stored_history(checkpoint_viewed_at),
1838 )
1839 })
1840 .collect())
1841 })
1842 });
1843
1844 let groups = futures::future::join_all(futures).await;
1846
1847 let mut results = HashMap::new();
1848 for group in groups {
1849 for (checkpoint_viewed_at, stored) in
1850 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1851 {
1852 let object =
1853 Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1854
1855 let key = LatestAtKey {
1856 id: object.address,
1857 checkpoint_viewed_at,
1858 };
1859
1860 results.insert(key, object);
1861 }
1862 }
1863
1864 Ok(results)
1865 }
1866}
1867
1868impl From<&ObjectKind> for ObjectStatus {
1869 fn from(kind: &ObjectKind) -> Self {
1870 match kind {
1871 ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1872 ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1873 ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1874 }
1875 }
1876}
1877
1878impl From<&Object> for OwnerImpl {
1879 fn from(object: &Object) -> Self {
1880 OwnerImpl {
1881 address: object.address,
1882 checkpoint_viewed_at: object.checkpoint_viewed_at,
1883 }
1884 }
1885}
1886
1887pub(crate) async fn deserialize_move_struct(
1888 move_object: &NativeMoveObject,
1889 resolver: &PackageResolver,
1890) -> Result<(StructTag, MoveStruct), Error> {
1891 let struct_tag = move_object.struct_tag().clone();
1892 let contents = move_object.contents();
1893 let move_type_layout = resolver
1894 .type_layout(TypeTag::from(struct_tag.clone()))
1895 .await
1896 .map_err(|e| {
1897 Error::Internal(format!(
1898 "Error fetching layout for type {}: {e}",
1899 struct_tag.to_canonical_string(true)
1900 ))
1901 })?;
1902
1903 let MoveTypeLayout::Struct(layout) = move_type_layout else {
1904 return Err(Error::Internal("Object is not a move struct".to_string()));
1905 };
1906
1907 let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1911 Error::Internal(format!(
1912 "Error deserializing move struct for type {}: {e}",
1913 struct_tag.to_canonical_string(true)
1914 ))
1915 })?;
1916
1917 Ok((struct_tag, move_struct))
1918}
1919
1920fn backward_objects_query(
1928 filter: &ObjectFilter,
1929 checkpoint_viewed_at: u64,
1930 page: &Page<Cursor>,
1931) -> RawQuery {
1932 if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1933 let ids_only_filter = ObjectFilter {
1936 object_keys: None,
1937 ..filter.clone()
1938 };
1939 let (id_query, id_bindings) = consistent::query(checkpoint_viewed_at, page, move |query| {
1940 ids_only_filter.apply(query)
1941 })
1942 .finish();
1943
1944 let keys_filter: HistoricalFilter = ObjectFilter {
1945 object_ids: None,
1946 ..filter.clone()
1947 }
1948 .try_into()
1949 .expect("object_keys is Some by match-arm guard");
1950 let (key_query, key_bindings) =
1951 historical::query(checkpoint_viewed_at, page, &keys_filter).finish();
1952
1953 RawQuery::new(
1954 format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1955 id_bindings.into_iter().chain(key_bindings).collect(),
1956 )
1957 .order_by("object_id")
1958 .limit(page.limit() as i64)
1959 } else if let Ok(keys_filter) = HistoricalFilter::try_from(filter.clone()) {
1960 historical::query(checkpoint_viewed_at, page, &keys_filter)
1961 } else {
1962 consistent::query(checkpoint_viewed_at, page, move |query| filter.apply(query))
1963 }
1964}
1965
1966#[cfg(test)]
1967mod tests {
1968 use std::str::FromStr;
1969
1970 use super::*;
1971
1972 #[test]
1973 fn test_owner_filter_intersection() {
1974 let f0 = ObjectFilter {
1975 owner: Some(IotaAddress::from_str("0x1").unwrap()),
1976 ..Default::default()
1977 };
1978
1979 let f1 = ObjectFilter {
1980 owner: Some(IotaAddress::from_str("0x2").unwrap()),
1981 ..Default::default()
1982 };
1983
1984 assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1985 assert_eq!(f0.intersect(f1), None);
1986 }
1987
1988 #[test]
1989 fn test_key_filter_intersection() {
1990 let i1 = IotaAddress::from_str("0x1").unwrap();
1991 let i2 = IotaAddress::from_str("0x2").unwrap();
1992 let i3 = IotaAddress::from_str("0x3").unwrap();
1993 let i4 = IotaAddress::from_str("0x4").unwrap();
1994
1995 let f0 = ObjectFilter {
1996 object_ids: Some(vec![i1, i3]),
1997 object_keys: Some(vec![
1998 ObjectKey {
1999 object_id: i2,
2000 version: 1.into(),
2001 },
2002 ObjectKey {
2003 object_id: i4,
2004 version: 2.into(),
2005 },
2006 ]),
2007 ..Default::default()
2008 };
2009
2010 let f1 = ObjectFilter {
2011 object_ids: Some(vec![i1, i2]),
2012 object_keys: Some(vec![ObjectKey {
2013 object_id: i4,
2014 version: 2.into(),
2015 }]),
2016 ..Default::default()
2017 };
2018
2019 let f2 = ObjectFilter {
2020 object_ids: Some(vec![i1, i3]),
2021 ..Default::default()
2022 };
2023
2024 let f3 = ObjectFilter {
2025 object_keys: Some(vec![
2026 ObjectKey {
2027 object_id: i2,
2028 version: 2.into(),
2029 },
2030 ObjectKey {
2031 object_id: i4,
2032 version: 2.into(),
2033 },
2034 ]),
2035 ..Default::default()
2036 };
2037
2038 assert_eq!(
2039 f0.clone().intersect(f1.clone()),
2040 Some(ObjectFilter {
2041 object_ids: Some(vec![i1]),
2042 object_keys: Some(vec![
2043 ObjectKey {
2044 object_id: i2,
2045 version: 1.into(),
2046 },
2047 ObjectKey {
2048 object_id: i4,
2049 version: 2.into(),
2050 },
2051 ]),
2052 ..Default::default()
2053 })
2054 );
2055
2056 assert_eq!(
2057 f1.clone().intersect(f2.clone()),
2058 Some(ObjectFilter {
2059 object_ids: Some(vec![i1]),
2060 ..Default::default()
2061 })
2062 );
2063
2064 assert_eq!(
2065 f1.intersect(f3.clone()),
2066 Some(ObjectFilter {
2067 object_keys: Some(vec![
2068 ObjectKey {
2069 object_id: i2,
2070 version: 2.into(),
2071 },
2072 ObjectKey {
2073 object_id: i4,
2074 version: 2.into(),
2075 },
2076 ]),
2077 ..Default::default()
2078 })
2079 );
2080
2081 assert_eq!(f0.intersect(f3.clone()), None);
2083
2084 assert_eq!(f2.intersect(f3), None);
2086 }
2087}