1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt::Write,
8};
9
10use async_graphql::{
11 connection::{Connection, CursorType, Edge},
12 dataloader::Loader,
13 *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17 models::objects::StoredHistoryObject,
18 schema::{objects_history, objects_version},
19 types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22 TypeTag,
23 object::{
24 MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25 bounded_visitor::BoundedVisitor,
26 },
27};
28use move_core_types::{
29 annotated_value::{MoveStruct, MoveTypeLayout},
30 language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35 connection::ScanConnection,
36 consistency::{Checkpointed, View, build_objects_query},
37 data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
38 error::Error,
39 filter, or_filter,
40 raw_query::RawQuery,
41 types::{
42 available_range::AvailableRange,
43 balance::{self, Balance},
44 base64::Base64,
45 big_int::BigInt,
46 coin::Coin,
47 coin_metadata::CoinMetadata,
48 cursor::{self, Page, RawPaginated, ScanLimited, Target},
49 digest::Digest,
50 display::{Display, DisplayEntry},
51 dynamic_field::{DynamicField, DynamicFieldName},
52 intersect,
53 iota_address::{IotaAddress, addr},
54 iota_names_registration::{DomainFormat, IotaNamesRegistration},
55 move_object::MoveObject,
56 move_package::MovePackage,
57 owner::{Owner, OwnerImpl},
58 stake::StakedIota,
59 transaction_block,
60 transaction_block::{TransactionBlock, TransactionBlockFilter},
61 type_filter::{ExactTypeFilter, TypeFilter},
62 uint53::UInt53,
63 },
64};
65
66#[derive(Clone, Debug)]
67pub(crate) struct Object {
68 pub address: IotaAddress,
69 pub kind: ObjectKind,
70 pub checkpoint_viewed_at: u64,
72 root_version: u64,
86}
87
88pub(crate) struct ObjectImpl<'o>(pub &'o Object);
90
91#[derive(Clone, Debug)]
92#[expect(clippy::large_enum_variant)]
93pub(crate) enum ObjectKind {
94 NotIndexed(NativeObject),
97 Indexed(NativeObject, StoredHistoryObject),
99 WrappedOrDeleted(u64),
102}
103
104#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
105#[graphql(name = "ObjectKind")]
106pub enum ObjectStatus {
107 NotIndexed,
110 Indexed,
112 WrappedOrDeleted,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
118pub(crate) struct ObjectRef {
119 pub address: IotaAddress,
121 pub version: UInt53,
123 pub digest: Digest,
125}
126
127#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
135pub(crate) struct ObjectFilter {
136 pub type_: Option<TypeFilter>,
146
147 pub owner: Option<IotaAddress>,
149
150 pub object_ids: Option<Vec<IotaAddress>>,
152
153 pub object_keys: Option<Vec<ObjectKey>>,
156}
157
158#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
159pub(crate) struct ObjectKey {
160 pub object_id: IotaAddress,
161 pub version: UInt53,
162}
163
164#[derive(Union, Clone)]
166pub(crate) enum ObjectOwner {
167 Immutable(Immutable),
168 Shared(Shared),
169 Parent(Parent),
170 Address(AddressOwner),
171}
172
173#[derive(SimpleObject, Clone)]
176pub(crate) struct Immutable {
177 #[graphql(name = "_")]
178 dummy: Option<bool>,
179}
180
181#[derive(SimpleObject, Clone)]
185pub(crate) struct Shared {
186 initial_shared_version: UInt53,
187}
188
189#[derive(SimpleObject, Clone)]
194pub(crate) struct Parent {
195 parent: Option<Object>,
196}
197
198#[derive(SimpleObject, Clone)]
203pub(crate) struct AddressOwner {
204 owner: Option<Owner>,
205}
206
207pub(crate) enum ObjectLookup {
209 LatestAt {
210 checkpoint_viewed_at: u64,
212 },
213
214 UnderParent {
215 parent_version: u64,
219 checkpoint_viewed_at: u64,
221 },
222
223 VersionAt {
224 version: u64,
226 checkpoint_viewed_at: u64,
228 },
229}
230
231pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
232
233#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
237pub(crate) struct HistoricalObjectCursor {
238 #[serde(rename = "o")]
239 object_id: Vec<u8>,
240 #[serde(rename = "c")]
242 checkpoint_viewed_at: u64,
243}
244
245#[expect(clippy::duplicated_attributes)]
248#[derive(Interface)]
249#[graphql(
250 name = "IObject",
251 field(name = "version", ty = "UInt53"),
252 field(
253 name = "status",
254 ty = "ObjectStatus",
255 desc = "The current status of the object as read from the off-chain store. The possible \
256 states are: NOT_INDEXED, the object is loaded from serialized data, such as the \
257 contents of a genesis or system package upgrade transaction. LIVE, the version \
258 returned is the most recent for the object, and it is not deleted or wrapped at \
259 that version. HISTORICAL, the object was referenced at a specific version or \
260 checkpoint, so is fetched from historical tables and may not be the latest version \
261 of the object. WRAPPED_OR_DELETED, the object is deleted or wrapped and only \
262 partial information can be loaded."
263 ),
264 field(
265 name = "digest",
266 ty = "Option<String>",
267 desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
268 string."
269 ),
270 field(
271 name = "owner",
272 ty = "Option<ObjectOwner>",
273 desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
274 Immutable and Shared Objects do not have owners."
275 ),
276 field(
277 name = "previous_transaction_block",
278 ty = "Option<TransactionBlock>",
279 desc = "The transaction block that created this version of the object."
280 ),
281 field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
282 field(
283 name = "received_transaction_blocks",
284 arg(name = "first", ty = "Option<u64>"),
285 arg(name = "after", ty = "Option<transaction_block::Cursor>"),
286 arg(name = "last", ty = "Option<u64>"),
287 arg(name = "before", ty = "Option<transaction_block::Cursor>"),
288 arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
289 arg(name = "scan_limit", ty = "Option<u64>"),
290 ty = "ScanConnection<String, TransactionBlock>",
291 desc = "The transaction blocks that sent objects to this object."
292 ),
293 field(
294 name = "bcs",
295 ty = "Option<Base64>",
296 desc = "The Base64-encoded BCS serialization of the object's content."
297 )
298)]
299pub(crate) enum IObject {
300 Object(Object),
301 MovePackage(MovePackage),
302 MoveObject(MoveObject),
303 Coin(Coin),
304 CoinMetadata(CoinMetadata),
305 StakedIota(StakedIota),
306}
307
308#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
312struct HistoricalKey {
313 id: IotaAddress,
314 version: u64,
315 checkpoint_viewed_at: u64,
316}
317
318#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
324struct ParentVersionKey {
325 id: IotaAddress,
326 parent_version: u64,
327 checkpoint_viewed_at: u64,
328}
329
330#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
333struct LatestAtKey {
334 id: IotaAddress,
335 checkpoint_viewed_at: u64,
336}
337
338#[Object]
343impl Object {
344 pub(crate) async fn address(&self) -> IotaAddress {
345 OwnerImpl::from(self).address().await
346 }
347
348 pub(crate) async fn objects(
350 &self,
351 ctx: &Context<'_>,
352 first: Option<u64>,
353 after: Option<Cursor>,
354 last: Option<u64>,
355 before: Option<Cursor>,
356 filter: Option<ObjectFilter>,
357 ) -> Result<Connection<String, MoveObject>> {
358 OwnerImpl::from(self)
359 .objects(ctx, first, after, last, before, filter)
360 .await
361 }
362
363 pub(crate) async fn balance(
366 &self,
367 ctx: &Context<'_>,
368 type_: Option<ExactTypeFilter>,
369 ) -> Result<Option<Balance>> {
370 OwnerImpl::from(self).balance(ctx, type_).await
371 }
372
373 pub(crate) async fn balances(
375 &self,
376 ctx: &Context<'_>,
377 first: Option<u64>,
378 after: Option<balance::Cursor>,
379 last: Option<u64>,
380 before: Option<balance::Cursor>,
381 ) -> Result<Connection<String, Balance>> {
382 OwnerImpl::from(self)
383 .balances(ctx, first, after, last, before)
384 .await
385 }
386
387 pub(crate) async fn coins(
392 &self,
393 ctx: &Context<'_>,
394 first: Option<u64>,
395 after: Option<Cursor>,
396 last: Option<u64>,
397 before: Option<Cursor>,
398 type_: Option<ExactTypeFilter>,
399 ) -> Result<Connection<String, Coin>> {
400 OwnerImpl::from(self)
401 .coins(ctx, first, after, last, before, type_)
402 .await
403 }
404
405 pub(crate) async fn staked_iotas(
407 &self,
408 ctx: &Context<'_>,
409 first: Option<u64>,
410 after: Option<Cursor>,
411 last: Option<u64>,
412 before: Option<Cursor>,
413 ) -> Result<Connection<String, StakedIota>> {
414 OwnerImpl::from(self)
415 .staked_iotas(ctx, first, after, last, before)
416 .await
417 }
418
419 pub(crate) async fn iota_names_default_name(
422 &self,
423 ctx: &Context<'_>,
424 format: Option<DomainFormat>,
425 ) -> Result<Option<String>> {
426 OwnerImpl::from(self)
427 .iota_names_default_name(ctx, format)
428 .await
429 }
430
431 pub(crate) async fn iota_names_registrations(
434 &self,
435 ctx: &Context<'_>,
436 first: Option<u64>,
437 after: Option<Cursor>,
438 last: Option<u64>,
439 before: Option<Cursor>,
440 ) -> Result<Connection<String, IotaNamesRegistration>> {
441 OwnerImpl::from(self)
442 .iota_names_registrations(ctx, first, after, last, before)
443 .await
444 }
445
446 pub(crate) async fn version(&self) -> UInt53 {
447 ObjectImpl(self).version().await
448 }
449
450 pub(crate) async fn status(&self) -> ObjectStatus {
460 ObjectImpl(self).status().await
461 }
462
463 pub(crate) async fn digest(&self) -> Option<String> {
466 ObjectImpl(self).digest().await
467 }
468
469 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
472 ObjectImpl(self).owner(ctx).await
473 }
474
475 pub(crate) async fn previous_transaction_block(
477 &self,
478 ctx: &Context<'_>,
479 ) -> Result<Option<TransactionBlock>> {
480 ObjectImpl(self).previous_transaction_block(ctx).await
481 }
482
483 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
487 ObjectImpl(self).storage_rebate().await
488 }
489
490 pub(crate) async fn received_transaction_blocks(
516 &self,
517 ctx: &Context<'_>,
518 first: Option<u64>,
519 after: Option<transaction_block::Cursor>,
520 last: Option<u64>,
521 before: Option<transaction_block::Cursor>,
522 filter: Option<TransactionBlockFilter>,
523 scan_limit: Option<u64>,
524 ) -> Result<ScanConnection<String, TransactionBlock>> {
525 ObjectImpl(self)
526 .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
527 .await
528 }
529
530 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
532 ObjectImpl(self).bcs().await
533 }
534
535 async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
539 ObjectImpl(self).display(ctx).await
540 }
541
542 async fn dynamic_field(
549 &self,
550 ctx: &Context<'_>,
551 name: DynamicFieldName,
552 ) -> Result<Option<DynamicField>> {
553 OwnerImpl::from(self)
554 .dynamic_field(ctx, name, Some(self.root_version()))
555 .await
556 }
557
558 async fn dynamic_object_field(
567 &self,
568 ctx: &Context<'_>,
569 name: DynamicFieldName,
570 ) -> Result<Option<DynamicField>> {
571 OwnerImpl::from(self)
572 .dynamic_object_field(ctx, name, Some(self.root_version()))
573 .await
574 }
575
576 async fn dynamic_fields(
581 &self,
582 ctx: &Context<'_>,
583 first: Option<u64>,
584 after: Option<Cursor>,
585 last: Option<u64>,
586 before: Option<Cursor>,
587 ) -> Result<Connection<String, DynamicField>> {
588 OwnerImpl::from(self)
589 .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
590 .await
591 }
592
593 async fn as_move_object(&self) -> Option<MoveObject> {
595 MoveObject::try_from(self).ok()
596 }
597
598 async fn as_move_package(&self) -> Option<MovePackage> {
600 MovePackage::try_from(self).ok()
601 }
602}
603
604impl ObjectImpl<'_> {
605 pub(crate) async fn version(&self) -> UInt53 {
606 self.0.version_impl().into()
607 }
608
609 pub(crate) async fn status(&self) -> ObjectStatus {
610 ObjectStatus::from(&self.0.kind)
611 }
612
613 pub(crate) async fn digest(&self) -> Option<String> {
614 self.0
615 .native_impl()
616 .map(|native| native.digest().base58_encode())
617 }
618
619 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
620 use NativeOwner as O;
621
622 let native = self.0.native_impl()?;
623
624 match native.owner {
625 O::AddressOwner(address) => {
626 let address = IotaAddress::from(address);
627 Some(ObjectOwner::Address(AddressOwner {
628 owner: Some(Owner {
629 address,
630 checkpoint_viewed_at: self.0.checkpoint_viewed_at,
631 root_version: None,
632 }),
633 }))
634 }
635 O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
636 O::ObjectOwner(address) => {
637 let parent = Object::query(
638 ctx,
639 address.into(),
640 Object::latest_at(self.0.checkpoint_viewed_at),
641 )
642 .await
643 .ok()
644 .flatten();
645
646 Some(ObjectOwner::Parent(Parent { parent }))
647 }
648 O::Shared {
649 initial_shared_version,
650 } => Some(ObjectOwner::Shared(Shared {
651 initial_shared_version: initial_shared_version.value().into(),
652 })),
653 }
654 }
655
656 pub(crate) async fn previous_transaction_block(
657 &self,
658 ctx: &Context<'_>,
659 ) -> Result<Option<TransactionBlock>> {
660 let Some(native) = self.0.native_impl() else {
661 return Ok(None);
662 };
663 let digest = native.previous_transaction;
664
665 TransactionBlock::query(ctx, digest.into(), self.0.checkpoint_viewed_at)
666 .await
667 .extend()
668 }
669
670 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
671 self.0
672 .native_impl()
673 .map(|native| BigInt::from(native.storage_rebate))
674 }
675
676 pub(crate) async fn received_transaction_blocks(
677 &self,
678 ctx: &Context<'_>,
679 first: Option<u64>,
680 after: Option<transaction_block::Cursor>,
681 last: Option<u64>,
682 before: Option<transaction_block::Cursor>,
683 filter: Option<TransactionBlockFilter>,
684 scan_limit: Option<u64>,
685 ) -> Result<ScanConnection<String, TransactionBlock>> {
686 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
687
688 let Some(filter) = filter
689 .unwrap_or_default()
690 .intersect(TransactionBlockFilter {
691 recv_address: Some(self.0.address),
692 ..Default::default()
693 })
694 else {
695 return Ok(ScanConnection::new(false, false));
696 };
697
698 TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
699 .await
700 .extend()
701 }
702
703 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
704 use ObjectKind as K;
705 Ok(match &self.0.kind {
706 K::WrappedOrDeleted(_) => None,
707 K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
711
712 K::NotIndexed(native) => {
713 let bytes = bcs::to_bytes(native)
714 .map_err(|e| {
715 Error::Internal(format!(
716 "Failed to serialize object at {}: {e}",
717 self.0.address
718 ))
719 })
720 .extend()?;
721 Some(Base64::from(&bytes))
722 }
723 })
724 }
725
726 pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
729 let Some(native) = self.0.native_impl() else {
730 return Ok(None);
731 };
732
733 let move_object = native
734 .data
735 .try_as_move()
736 .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
737 .extend()?;
738
739 let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
740 .await
741 .extend()?;
742
743 let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
744 .await
745 .extend()?
746 else {
747 return Ok(None);
748 };
749
750 Ok(Some(display.render(&move_struct).extend()?))
751 }
752}
753
754impl Object {
755 pub(crate) fn from_native(
770 address: IotaAddress,
771 native: NativeObject,
772 checkpoint_viewed_at: u64,
773 root_version: Option<u64>,
774 ) -> Object {
775 let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
776 Object {
777 address,
778 kind: ObjectKind::NotIndexed(native),
779 checkpoint_viewed_at,
780 root_version,
781 }
782 }
783
784 pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
785 use ObjectKind as K;
786
787 match &self.kind {
788 K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
789 K::WrappedOrDeleted(_) => None,
790 }
791 }
792
793 pub(crate) fn version_impl(&self) -> u64 {
794 use ObjectKind as K;
795
796 match &self.kind {
797 K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
798 K::WrappedOrDeleted(object_version) => *object_version,
799 }
800 }
801
802 pub(crate) fn root_version(&self) -> u64 {
806 self.root_version
807 }
808
809 pub(crate) async fn paginate(
816 db: &Db,
817 page: Page<Cursor>,
818 filter: ObjectFilter,
819 checkpoint_viewed_at: u64,
820 ) -> Result<Connection<String, Object>, Error> {
821 Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
822 }
823
824 pub(crate) async fn paginate_subtype<T: OutputType>(
841 db: &Db,
842 page: Page<Cursor>,
843 filter: ObjectFilter,
844 checkpoint_viewed_at: u64,
845 downcast: impl Fn(Object) -> Result<T, Error>,
846 ) -> Result<Connection<String, T>, Error> {
847 let cursor_viewed_at = page.validate_cursor_consistency()?;
852 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
853
854 let Some((prev, next, results)) = db
855 .execute_repeatable(move |conn| {
856 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
857 return Ok::<_, diesel::result::Error>(None);
858 };
859
860 Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
861 conn,
862 checkpoint_viewed_at,
863 objects_query(&filter, range, &page),
864 )?))
865 })
866 .await?
867 else {
868 return Err(Error::Client(
869 "Requested data is outside the available range".to_string(),
870 ));
871 };
872
873 let mut conn: Connection<String, T> = Connection::new(prev, next);
874
875 for stored in results {
876 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
879 let object =
880 Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
881 conn.edges.push(Edge::new(cursor, downcast(object)?));
882 }
883
884 Ok(conn)
885 }
886
887 pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
889 ObjectLookup::LatestAt {
890 checkpoint_viewed_at,
891 }
892 }
893
894 pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
897 ObjectLookup::UnderParent {
898 parent_version,
899 checkpoint_viewed_at,
900 }
901 }
902
903 pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
905 ObjectLookup::VersionAt {
906 version,
907 checkpoint_viewed_at,
908 }
909 }
910
911 pub(crate) async fn query(
912 ctx: &Context<'_>,
913 id: IotaAddress,
914 key: ObjectLookup,
915 ) -> Result<Option<Self>, Error> {
916 let DataLoader(loader) = &ctx.data_unchecked();
917
918 match key {
919 ObjectLookup::VersionAt {
920 version,
921 checkpoint_viewed_at,
922 } => {
923 loader
924 .load_one(HistoricalKey {
925 id,
926 version,
927 checkpoint_viewed_at,
928 })
929 .await
930 }
931
932 ObjectLookup::UnderParent {
933 parent_version,
934 checkpoint_viewed_at,
935 } => {
936 loader
937 .load_one(ParentVersionKey {
938 id,
939 parent_version,
940 checkpoint_viewed_at,
941 })
942 .await
943 }
944
945 ObjectLookup::LatestAt {
946 checkpoint_viewed_at,
947 } => {
948 loader
949 .load_one(LatestAtKey {
950 id,
951 checkpoint_viewed_at,
952 })
953 .await
954 }
955 }
956 }
957
958 pub(crate) async fn query_singleton(
962 db: &Db,
963 type_: StructTag,
964 checkpoint_viewed_at: u64,
965 ) -> Result<Option<Object>, Error> {
966 let filter = ObjectFilter {
967 type_: Some(TypeFilter::ByType(type_)),
968 ..Default::default()
969 };
970
971 let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
972
973 Ok(connection.edges.into_iter().next().map(|edge| edge.node))
974 }
975
976 pub(crate) fn try_from_stored_history_object(
988 history_object: StoredHistoryObject,
989 checkpoint_viewed_at: u64,
990 root_version: Option<u64>,
991 ) -> Result<Self, Error> {
992 let address = addr(&history_object.object_id)?;
993
994 let object_status =
995 NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
996 Error::Internal(format!(
997 "Unknown object status {} for object {} at version {}",
998 history_object.object_status, address, history_object.object_version
999 ))
1000 })?;
1001
1002 match object_status {
1003 NativeObjectStatus::Active => {
1004 let Some(serialized_object) = &history_object.serialized_object else {
1005 return Err(Error::Internal(format!(
1006 "Live object {} at version {} cannot have missing serialized_object field",
1007 address, history_object.object_version
1008 )));
1009 };
1010
1011 let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1012 Error::Internal(format!("Failed to deserialize object {address}"))
1013 })?;
1014
1015 let root_version =
1016 root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1017 Ok(Self {
1018 address,
1019 kind: ObjectKind::Indexed(native_object, history_object),
1020 checkpoint_viewed_at,
1021 root_version,
1022 })
1023 }
1024 NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1025 address,
1026 kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1027 checkpoint_viewed_at,
1028 root_version: history_object.object_version as u64,
1029 }),
1030 }
1031 }
1032}
1033
1034fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1045 native.as_inner().version().into()
1046}
1047
1048impl ObjectFilter {
1049 pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1055 macro_rules! intersect {
1056 ($field:ident, $body:expr) => {
1057 intersect::field(self.$field, other.$field, $body)
1058 };
1059 }
1060
1061 let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1064 let mut combined = BTreeMap::new();
1065
1066 for (id, v) in k {
1067 if let Some(w) = l.get(&id).copied() {
1068 combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1069 }
1070 }
1071
1072 (!combined.is_empty()).then_some(combined)
1076 })?;
1077
1078 let object_ids = {
1085 let partition: Vec<_> = keys
1086 .iter()
1087 .flatten()
1088 .filter_map(|(id, v)| v.is_none().then_some(*id))
1089 .collect();
1090
1091 (!partition.is_empty()).then_some(partition)
1092 };
1093
1094 let object_keys = {
1095 let partition: Vec<_> = keys
1096 .iter()
1097 .flatten()
1098 .filter_map(|(id, v)| {
1099 Some(ObjectKey {
1100 object_id: *id,
1101 version: (*v)?.into(),
1102 })
1103 })
1104 .collect();
1105
1106 (!partition.is_empty()).then_some(partition)
1107 };
1108
1109 Some(Self {
1110 type_: intersect!(type_, TypeFilter::intersect)?,
1111 owner: intersect!(owner, intersect::by_eq)?,
1112 object_ids,
1113 object_keys,
1114 })
1115 }
1116
1117 fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1121 if self.object_keys.is_none() && self.object_ids.is_none() {
1122 return None;
1123 }
1124
1125 Some(BTreeMap::from_iter(
1126 self.object_keys
1127 .iter()
1128 .flatten()
1129 .map(|key| (key.object_id, Some(key.version.into())))
1130 .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1133 ))
1134 }
1135
1136 pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1139 if let Some(object_ids) = &self.object_ids {
1142 if object_ids.is_empty() {
1144 query = or_filter!(query, "1=0");
1145 } else {
1146 let mut inner = String::new();
1147 let mut prefix = "object_id IN (";
1148 for id in object_ids {
1149 write!(
1151 &mut inner,
1152 "{prefix}'\\x{}'::bytea",
1153 hex::encode(id.into_vec())
1154 )
1155 .unwrap();
1156 prefix = ", ";
1157 }
1158 inner.push(')');
1159 query = or_filter!(query, inner);
1160 }
1161 }
1162
1163 if let Some(object_keys) = &self.object_keys {
1164 if object_keys.is_empty() {
1166 query = or_filter!(query, "1=0");
1167 } else {
1168 let mut inner = String::new();
1169 let mut prefix = "(";
1170 for ObjectKey { object_id, version } in object_keys {
1171 write!(
1173 &mut inner,
1174 "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1175 hex::encode(object_id.into_vec()),
1176 version
1177 )
1178 .unwrap();
1179 prefix = " OR ";
1180 }
1181 inner.push(')');
1182 query = or_filter!(query, inner);
1183 }
1184 }
1185
1186 if let Some(owner) = self.owner {
1187 query = filter!(
1188 query,
1189 format!(
1190 "owner_id = '\\x{}'::bytea AND owner_type = {}",
1191 hex::encode(owner.into_vec()),
1192 OwnerType::Address as i16
1193 )
1194 );
1195 }
1196
1197 if let Some(type_) = &self.type_ {
1198 return type_.apply_raw(
1199 query,
1200 "object_type",
1201 "object_type_package",
1202 "object_type_module",
1203 "object_type_name",
1204 );
1205 }
1206
1207 query
1208 }
1209
1210 pub(crate) fn has_filters(&self) -> bool {
1211 self != &Default::default()
1212 }
1213}
1214
1215impl HistoricalObjectCursor {
1216 pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1217 Self {
1218 object_id,
1219 checkpoint_viewed_at,
1220 }
1221 }
1222}
1223
1224impl Checkpointed for Cursor {
1225 fn checkpoint_viewed_at(&self) -> u64 {
1226 self.checkpoint_viewed_at
1227 }
1228}
1229
1230impl ScanLimited for Cursor {}
1231
1232impl RawPaginated<Cursor> for StoredHistoryObject {
1233 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1234 filter!(
1235 query,
1236 format!(
1237 "candidates.object_id >= '\\x{}'::bytea",
1238 hex::encode(cursor.object_id.clone())
1239 )
1240 )
1241 }
1242
1243 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1244 filter!(
1245 query,
1246 format!(
1247 "candidates.object_id <= '\\x{}'::bytea",
1248 hex::encode(cursor.object_id.clone())
1249 )
1250 )
1251 }
1252
1253 fn order(asc: bool, query: RawQuery) -> RawQuery {
1254 if asc {
1255 query.order_by("candidates.object_id ASC")
1256 } else {
1257 query.order_by("candidates.object_id DESC")
1258 }
1259 }
1260}
1261
1262impl Target<Cursor> for StoredHistoryObject {
1263 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1264 Cursor::new(HistoricalObjectCursor::new(
1265 self.object_id.clone(),
1266 checkpoint_viewed_at,
1267 ))
1268 }
1269}
1270
1271impl Loader<HistoricalKey> for Db {
1272 type Value = Object;
1273 type Error = Error;
1274
1275 async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1276 use objects_history::dsl as h;
1277 use objects_version::dsl as v;
1278
1279 let id_versions: BTreeSet<_> = keys
1280 .iter()
1281 .map(|key| (key.id.into_vec(), key.version as i64))
1282 .collect();
1283
1284 let objects: Vec<StoredHistoryObject> = self
1285 .execute(move |conn| {
1286 conn.results(move || {
1287 let mut query = h::objects_history
1288 .inner_join(
1289 v::objects_version.on(v::cp_sequence_number
1290 .eq(h::checkpoint_sequence_number)
1291 .and(v::object_id.eq(h::object_id))
1292 .and(v::object_version.eq(h::object_version))),
1293 )
1294 .select(StoredHistoryObject::as_select())
1295 .into_boxed();
1296
1297 for (id, version) in id_versions.iter().cloned() {
1298 query =
1299 query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1300 }
1301
1302 query
1303 })
1304 })
1305 .await
1306 .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1307
1308 let mut id_version_to_stored = BTreeMap::new();
1309 for stored in objects {
1310 let key = (addr(&stored.object_id)?, stored.object_version as u64);
1311 id_version_to_stored.insert(key, stored);
1312 }
1313
1314 let mut result = HashMap::new();
1315 for key in keys {
1316 let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1317 continue;
1318 };
1319
1320 if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1324 continue;
1325 }
1326
1327 let object = Object::try_from_stored_history_object(
1328 stored.clone(),
1329 key.checkpoint_viewed_at,
1330 None,
1332 )?;
1333 result.insert(*key, object);
1334 }
1335
1336 Ok(result)
1337 }
1338}
1339
1340impl Loader<ParentVersionKey> for Db {
1341 type Value = Object;
1342 type Error = Error;
1343
1344 async fn load(
1345 &self,
1346 keys: &[ParentVersionKey],
1347 ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1348 #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1351 struct GroupKey {
1352 checkpoint_viewed_at: u64,
1353 parent_version: u64,
1354 }
1355
1356 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1357 for key in keys {
1358 let group_key = GroupKey {
1359 checkpoint_viewed_at: key.checkpoint_viewed_at,
1360 parent_version: key.parent_version,
1361 };
1362
1363 keys_by_cursor_and_parent_version
1364 .entry(group_key)
1365 .or_default()
1366 .insert(key.id.into_vec());
1367 }
1368
1369 let futures = keys_by_cursor_and_parent_version
1371 .into_iter()
1372 .map(|(group_key, ids)| {
1373 self.execute(move |conn| {
1374 let stored: Vec<StoredHistoryObject> = conn.results(move || {
1375 use objects_history::dsl as h;
1376 use objects_version::dsl as v;
1377
1378 h::objects_history
1379 .inner_join(
1380 v::objects_version.on(v::cp_sequence_number
1381 .eq(h::checkpoint_sequence_number)
1382 .and(v::object_id.eq(h::object_id))
1383 .and(v::object_version.eq(h::object_version))),
1384 )
1385 .select(StoredHistoryObject::as_select())
1386 .filter(v::object_id.eq_any(ids.iter().cloned()))
1387 .filter(v::object_version.le(group_key.parent_version as i64))
1388 .distinct_on(v::object_id)
1389 .order_by(v::object_id)
1390 .then_order_by(v::object_version.desc())
1391 .into_boxed()
1392 })?;
1393
1394 Ok::<_, diesel::result::Error>(
1395 stored
1396 .into_iter()
1397 .map(|stored| (group_key, stored))
1398 .collect::<Vec<_>>(),
1399 )
1400 })
1401 });
1402
1403 let groups = futures::future::join_all(futures).await;
1405
1406 let mut results = HashMap::new();
1407 for group in groups {
1408 for (group_key, stored) in
1409 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1410 {
1411 if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1414 continue;
1415 }
1416
1417 let object = Object::try_from_stored_history_object(
1418 stored,
1419 group_key.checkpoint_viewed_at,
1420 Some(group_key.parent_version),
1423 )?;
1424
1425 let key = ParentVersionKey {
1426 id: object.address,
1427 checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1428 parent_version: group_key.parent_version,
1429 };
1430
1431 results.insert(key, object);
1432 }
1433 }
1434
1435 Ok(results)
1436 }
1437}
1438
1439impl Loader<LatestAtKey> for Db {
1440 type Value = Object;
1441 type Error = Error;
1442
1443 async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1444 let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1447
1448 for key in keys {
1449 keys_by_cursor_and_parent_version
1450 .entry(key.checkpoint_viewed_at)
1451 .or_default()
1452 .insert(key.id);
1453 }
1454
1455 let futures =
1457 keys_by_cursor_and_parent_version
1458 .into_iter()
1459 .map(|(checkpoint_viewed_at, ids)| {
1460 self.execute_repeatable(move |conn| {
1461 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1462 else {
1463 return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1464 vec![],
1465 );
1466 };
1467
1468 let filter = ObjectFilter {
1469 object_ids: Some(ids.iter().cloned().collect()),
1470 ..Default::default()
1471 };
1472
1473 Ok(conn
1474 .results(move || {
1475 build_objects_query(
1476 View::Consistent,
1477 range,
1478 &Page::bounded(ids.len() as u64),
1479 |q| filter.apply(q),
1480 |q| q,
1481 )
1482 .into_boxed()
1483 })?
1484 .into_iter()
1485 .map(|r| (checkpoint_viewed_at, r))
1486 .collect())
1487 })
1488 });
1489
1490 let groups = futures::future::join_all(futures).await;
1492
1493 let mut results = HashMap::new();
1494 for group in groups {
1495 for (checkpoint_viewed_at, stored) in
1496 group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1497 {
1498 let object =
1499 Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1500
1501 let key = LatestAtKey {
1502 id: object.address,
1503 checkpoint_viewed_at,
1504 };
1505
1506 results.insert(key, object);
1507 }
1508 }
1509
1510 Ok(results)
1511 }
1512}
1513
1514impl From<&ObjectKind> for ObjectStatus {
1515 fn from(kind: &ObjectKind) -> Self {
1516 match kind {
1517 ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1518 ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1519 ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1520 }
1521 }
1522}
1523
1524impl From<&Object> for OwnerImpl {
1525 fn from(object: &Object) -> Self {
1526 OwnerImpl {
1527 address: object.address,
1528 checkpoint_viewed_at: object.checkpoint_viewed_at,
1529 }
1530 }
1531}
1532
1533pub(crate) async fn deserialize_move_struct(
1534 move_object: &NativeMoveObject,
1535 resolver: &PackageResolver,
1536) -> Result<(StructTag, MoveStruct), Error> {
1537 let struct_tag = StructTag::from(move_object.type_().clone());
1538 let contents = move_object.contents();
1539 let move_type_layout = resolver
1540 .type_layout(TypeTag::from(struct_tag.clone()))
1541 .await
1542 .map_err(|e| {
1543 Error::Internal(format!(
1544 "Error fetching layout for type {}: {e}",
1545 struct_tag.to_canonical_string(true)
1546 ))
1547 })?;
1548
1549 let MoveTypeLayout::Struct(layout) = move_type_layout else {
1550 return Err(Error::Internal("Object is not a move struct".to_string()));
1551 };
1552
1553 let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1557 Error::Internal(format!(
1558 "Error deserializing move struct for type {}: {e}",
1559 struct_tag.to_canonical_string(true)
1560 ))
1561 })?;
1562
1563 Ok((struct_tag, move_struct))
1564}
1565
1566fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1571where
1572{
1573 if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1574 let ids_only_filter = ObjectFilter {
1577 object_keys: None,
1578 ..filter.clone()
1579 };
1580 let (id_query, id_bindings) = build_objects_query(
1581 View::Consistent,
1582 range,
1583 page,
1584 move |query| ids_only_filter.apply(query),
1585 move |newer| newer,
1586 )
1587 .finish();
1588
1589 let keys_only_filter = ObjectFilter {
1590 object_ids: None,
1591 ..filter.clone()
1592 };
1593 let (key_query, key_bindings) = build_objects_query(
1594 View::Historical,
1595 range,
1596 page,
1597 move |query| keys_only_filter.apply(query),
1598 move |newer| newer,
1599 )
1600 .finish();
1601
1602 RawQuery::new(
1603 format!(
1604 "SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",
1605 id_query = id_query,
1606 key_query = key_query,
1607 ),
1608 id_bindings.into_iter().chain(key_bindings).collect(),
1609 )
1610 .order_by("object_id")
1611 .limit(page.limit() as i64)
1612 } else {
1613 let view = if filter.object_keys.is_some() || !filter.has_filters() {
1615 View::Historical
1616 } else {
1617 View::Consistent
1618 };
1619
1620 build_objects_query(
1621 view,
1622 range,
1623 page,
1624 move |query| filter.apply(query),
1625 move |newer| newer,
1626 )
1627 }
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use std::str::FromStr;
1633
1634 use super::*;
1635
1636 #[test]
1637 fn test_owner_filter_intersection() {
1638 let f0 = ObjectFilter {
1639 owner: Some(IotaAddress::from_str("0x1").unwrap()),
1640 ..Default::default()
1641 };
1642
1643 let f1 = ObjectFilter {
1644 owner: Some(IotaAddress::from_str("0x2").unwrap()),
1645 ..Default::default()
1646 };
1647
1648 assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1649 assert_eq!(f0.clone().intersect(f1.clone()), None);
1650 }
1651
1652 #[test]
1653 fn test_key_filter_intersection() {
1654 let i1 = IotaAddress::from_str("0x1").unwrap();
1655 let i2 = IotaAddress::from_str("0x2").unwrap();
1656 let i3 = IotaAddress::from_str("0x3").unwrap();
1657 let i4 = IotaAddress::from_str("0x4").unwrap();
1658
1659 let f0 = ObjectFilter {
1660 object_ids: Some(vec![i1, i3]),
1661 object_keys: Some(vec![
1662 ObjectKey {
1663 object_id: i2,
1664 version: 1.into(),
1665 },
1666 ObjectKey {
1667 object_id: i4,
1668 version: 2.into(),
1669 },
1670 ]),
1671 ..Default::default()
1672 };
1673
1674 let f1 = ObjectFilter {
1675 object_ids: Some(vec![i1, i2]),
1676 object_keys: Some(vec![ObjectKey {
1677 object_id: i4,
1678 version: 2.into(),
1679 }]),
1680 ..Default::default()
1681 };
1682
1683 let f2 = ObjectFilter {
1684 object_ids: Some(vec![i1, i3]),
1685 ..Default::default()
1686 };
1687
1688 let f3 = ObjectFilter {
1689 object_keys: Some(vec![
1690 ObjectKey {
1691 object_id: i2,
1692 version: 2.into(),
1693 },
1694 ObjectKey {
1695 object_id: i4,
1696 version: 2.into(),
1697 },
1698 ]),
1699 ..Default::default()
1700 };
1701
1702 assert_eq!(
1703 f0.clone().intersect(f1.clone()),
1704 Some(ObjectFilter {
1705 object_ids: Some(vec![i1]),
1706 object_keys: Some(vec![
1707 ObjectKey {
1708 object_id: i2,
1709 version: 1.into(),
1710 },
1711 ObjectKey {
1712 object_id: i4,
1713 version: 2.into(),
1714 },
1715 ]),
1716 ..Default::default()
1717 })
1718 );
1719
1720 assert_eq!(
1721 f1.clone().intersect(f2.clone()),
1722 Some(ObjectFilter {
1723 object_ids: Some(vec![i1]),
1724 ..Default::default()
1725 })
1726 );
1727
1728 assert_eq!(
1729 f1.clone().intersect(f3.clone()),
1730 Some(ObjectFilter {
1731 object_keys: Some(vec![
1732 ObjectKey {
1733 object_id: i2,
1734 version: 2.into(),
1735 },
1736 ObjectKey {
1737 object_id: i4,
1738 version: 2.into(),
1739 },
1740 ]),
1741 ..Default::default()
1742 })
1743 );
1744
1745 assert_eq!(f0.clone().intersect(f3.clone()), None);
1747
1748 assert_eq!(f2.clone().intersect(f3.clone()), None);
1750 }
1751}