1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 dataloader::Loader,
10 *,
11};
12use diesel::{
13 BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable,
14 prelude::QueryableByName,
15};
16use iota_indexer::{models::objects::StoredHistoryObject, schema::packages};
17use iota_package_resolver::{Package as ParsedMovePackage, error::Error as PackageCacheError};
18use iota_types::{is_system_package, move_package::MovePackage as NativeMovePackage, object::Data};
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 connection::ScanConnection,
23 consistency::{Checkpointed, ConsistentNamedCursor},
24 data::{DataLoader, Db, DbConnection, QueryExecutor},
25 error::Error,
26 filter, query,
27 raw_query::RawQuery,
28 types::{
29 balance::{self, Balance},
30 base64::Base64,
31 big_int::BigInt,
32 coin::Coin,
33 cursor::{BcsCursor, JsonCursor, Page, RawPaginated, ScanLimited, Target},
34 iota_address::{IotaAddress, addr},
35 iota_names_registration::{DomainFormat, IotaNamesRegistration},
36 move_module::MoveModule,
37 move_object::MoveObject,
38 object::{self, Object, ObjectFilter, ObjectImpl, ObjectOwner, ObjectStatus},
39 owner::OwnerImpl,
40 stake::StakedIota,
41 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
42 type_filter::ExactTypeFilter,
43 uint53::UInt53,
44 },
45};
46
47#[derive(Clone)]
48pub(crate) struct MovePackage {
49 pub super_: Object,
51
52 pub native: NativeMovePackage,
55}
56
57#[derive(InputObject, Debug, Default, Clone)]
60pub(crate) struct MovePackageCheckpointFilter {
61 pub after_checkpoint: Option<UInt53>,
64
65 pub before_checkpoint: Option<UInt53>,
69}
70
71#[derive(InputObject, Debug, Default, Clone)]
73pub(crate) struct MovePackageVersionFilter {
74 pub after_version: Option<UInt53>,
77
78 pub before_version: Option<UInt53>,
82}
83
84pub(crate) enum PackageLookup {
89 ById { checkpoint_viewed_at: u64 },
92
93 Versioned {
96 version: u64,
97 checkpoint_viewed_at: u64,
98 },
99
100 Latest { checkpoint_viewed_at: u64 },
104}
105
106#[derive(SimpleObject)]
109struct Linkage {
110 original_id: IotaAddress,
112
113 upgraded_id: IotaAddress,
116
117 version: UInt53,
119}
120
121#[derive(SimpleObject)]
123struct TypeOrigin {
124 module: String,
126
127 #[graphql(name = "struct")]
129 struct_: String,
130
131 defining_id: IotaAddress,
133}
134
135#[derive(Selectable, QueryableByName)]
138#[diesel(table_name = packages)]
139struct StoredHistoryPackage {
140 original_id: Vec<u8>,
141 #[diesel(embed)]
142 object: StoredHistoryObject,
143}
144
145pub(crate) struct MovePackageDowncastError;
146
147pub(crate) type CModule = JsonCursor<ConsistentNamedCursor>;
148pub(crate) type Cursor = BcsCursor<PackageCursor>;
149
150#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
155pub(crate) struct PackageCursor {
156 pub checkpoint_sequence_number: u64,
157 pub original_id: Vec<u8>,
158 pub package_version: u64,
159 pub checkpoint_viewed_at: u64,
160}
161
162#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
171struct PackageVersionKey {
172 address: IotaAddress,
173 version: u64,
174}
175
176#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
180struct LatestKey {
181 address: IotaAddress,
182 checkpoint_viewed_at: u64,
183}
184
185#[Object]
189impl MovePackage {
190 pub(crate) async fn address(&self) -> IotaAddress {
191 OwnerImpl::from(&self.super_).address().await
192 }
193
194 pub(crate) async fn objects(
199 &self,
200 ctx: &Context<'_>,
201 first: Option<u64>,
202 after: Option<object::Cursor>,
203 last: Option<u64>,
204 before: Option<object::Cursor>,
205 filter: Option<ObjectFilter>,
206 ) -> Result<Connection<String, MoveObject>> {
207 OwnerImpl::from(&self.super_)
208 .objects(ctx, first, after, last, before, filter)
209 .await
210 }
211
212 pub(crate) async fn balance(
218 &self,
219 ctx: &Context<'_>,
220 type_: Option<ExactTypeFilter>,
221 ) -> Result<Option<Balance>> {
222 OwnerImpl::from(&self.super_).balance(ctx, type_).await
223 }
224
225 pub(crate) async fn balances(
230 &self,
231 ctx: &Context<'_>,
232 first: Option<u64>,
233 after: Option<balance::Cursor>,
234 last: Option<u64>,
235 before: Option<balance::Cursor>,
236 ) -> Result<Connection<String, Balance>> {
237 OwnerImpl::from(&self.super_)
238 .balances(ctx, first, after, last, before)
239 .await
240 }
241
242 pub(crate) async fn coins(
250 &self,
251 ctx: &Context<'_>,
252 first: Option<u64>,
253 after: Option<object::Cursor>,
254 last: Option<u64>,
255 before: Option<object::Cursor>,
256 type_: Option<ExactTypeFilter>,
257 ) -> Result<Connection<String, Coin>> {
258 OwnerImpl::from(&self.super_)
259 .coins(ctx, first, after, last, before, type_)
260 .await
261 }
262
263 pub(crate) async fn staked_iotas(
268 &self,
269 ctx: &Context<'_>,
270 first: Option<u64>,
271 after: Option<object::Cursor>,
272 last: Option<u64>,
273 before: Option<object::Cursor>,
274 ) -> Result<Connection<String, StakedIota>> {
275 OwnerImpl::from(&self.super_)
276 .staked_iotas(ctx, first, after, last, before)
277 .await
278 }
279
280 pub(crate) async fn iota_names_default_name(
283 &self,
284 ctx: &Context<'_>,
285 format: Option<DomainFormat>,
286 ) -> Result<Option<String>> {
287 OwnerImpl::from(&self.super_)
288 .iota_names_default_name(ctx, format)
289 .await
290 }
291
292 pub(crate) async fn iota_names_registrations(
298 &self,
299 ctx: &Context<'_>,
300 first: Option<u64>,
301 after: Option<object::Cursor>,
302 last: Option<u64>,
303 before: Option<object::Cursor>,
304 ) -> Result<Connection<String, IotaNamesRegistration>> {
305 OwnerImpl::from(&self.super_)
306 .iota_names_registrations(ctx, first, after, last, before)
307 .await
308 }
309
310 pub(crate) async fn version(&self) -> UInt53 {
311 ObjectImpl(&self.super_).version().await
312 }
313
314 pub(crate) async fn status(&self) -> ObjectStatus {
324 ObjectImpl(&self.super_).status().await
325 }
326
327 pub(crate) async fn digest(&self) -> Option<String> {
330 ObjectImpl(&self.super_).digest().await
331 }
332
333 pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
336 ObjectImpl(&self.super_).owner(ctx).await
337 }
338
339 pub(crate) async fn previous_transaction_block(
341 &self,
342 ctx: &Context<'_>,
343 ) -> Result<Option<TransactionBlock>> {
344 ObjectImpl(&self.super_)
345 .previous_transaction_block(ctx)
346 .await
347 }
348
349 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
356 ObjectImpl(&self.super_).storage_rebate().await
357 }
358
359 pub(crate) async fn received_transaction_blocks(
387 &self,
388 ctx: &Context<'_>,
389 first: Option<u64>,
390 after: Option<transaction_block::Cursor>,
391 last: Option<u64>,
392 before: Option<transaction_block::Cursor>,
393 filter: Option<TransactionBlockFilter>,
394 scan_limit: Option<u64>,
395 ) -> Result<ScanConnection<String, TransactionBlock>> {
396 ObjectImpl(&self.super_)
397 .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
398 .await
399 }
400
401 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
403 ObjectImpl(&self.super_).bcs().await
404 }
405
406 async fn package_at_version(
409 &self,
410 ctx: &Context<'_>,
411 version: u64,
412 ) -> Result<Option<MovePackage>> {
413 MovePackage::query(
414 ctx,
415 self.super_.address,
416 MovePackage::by_version(version, self.checkpoint_viewed_at_impl()),
417 )
418 .await
419 .extend()
420 }
421
422 async fn package_versions(
426 &self,
427 ctx: &Context<'_>,
428 first: Option<u64>,
429 after: Option<Cursor>,
430 last: Option<u64>,
431 before: Option<Cursor>,
432 filter: Option<MovePackageVersionFilter>,
433 ) -> Result<Connection<String, MovePackage>> {
434 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
435
436 MovePackage::paginate_by_version(
437 ctx.data_unchecked(),
438 page,
439 self.super_.address,
440 filter,
441 self.checkpoint_viewed_at_impl(),
442 )
443 .await
444 .extend()
445 }
446
447 async fn latest_package(&self, ctx: &Context<'_>) -> Result<MovePackage> {
450 Ok(MovePackage::query(
451 ctx,
452 self.super_.address,
453 MovePackage::latest_at(self.checkpoint_viewed_at_impl()),
454 )
455 .await
456 .extend()?
457 .ok_or_else(|| Error::Internal("No latest version found".to_string()))?)
458 }
459
460 async fn module(&self, name: String) -> Result<Option<MoveModule>> {
463 self.module_impl(&name).extend()
464 }
465
466 pub async fn modules(
468 &self,
469 ctx: &Context<'_>,
470 first: Option<u64>,
471 after: Option<CModule>,
472 last: Option<u64>,
473 before: Option<CModule>,
474 ) -> Result<Option<Connection<String, MoveModule>>> {
475 use std::ops::Bound as B;
476
477 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
478 let cursor_viewed_at = page.validate_cursor_consistency()?;
479 let checkpoint_viewed_at =
480 cursor_viewed_at.unwrap_or_else(|| self.checkpoint_viewed_at_impl());
481
482 let parsed = self.parsed_package()?;
483 let module_range = parsed.modules().range::<String, _>((
484 page.after().map_or(B::Unbounded, |a| B::Excluded(&a.name)),
485 page.before().map_or(B::Unbounded, |b| B::Excluded(&b.name)),
486 ));
487
488 let mut connection = Connection::new(false, false);
489 let modules = if page.is_from_front() {
490 module_range.take(page.limit()).collect()
491 } else {
492 let mut ms: Vec<_> = module_range.rev().take(page.limit()).collect();
493 ms.reverse();
494 ms
495 };
496
497 connection.has_previous_page = modules.first().is_some_and(|(fst, _)| {
498 parsed
499 .modules()
500 .range::<String, _>((B::Unbounded, B::Excluded(*fst)))
501 .next()
502 .is_some()
503 });
504
505 connection.has_next_page = modules.last().is_some_and(|(lst, _)| {
506 parsed
507 .modules()
508 .range::<String, _>((B::Excluded(*lst), B::Unbounded))
509 .next()
510 .is_some()
511 });
512
513 for (name, parsed) in modules {
514 let Some(native) = self.native.serialized_module_map().get(name) else {
515 return Err(Error::Internal(format!(
516 "Module '{name}' exists in PackageCache but not in serialized map.",
517 ))
518 .extend());
519 };
520
521 let cursor = JsonCursor::new(ConsistentNamedCursor {
522 name: name.clone(),
523 c: checkpoint_viewed_at,
524 })
525 .encode_cursor();
526 connection.edges.push(Edge::new(
527 cursor,
528 MoveModule {
529 storage_id: self.super_.address,
530 native: native.clone(),
531 parsed: parsed.clone(),
532 checkpoint_viewed_at,
533 },
534 ))
535 }
536
537 if connection.edges.is_empty() {
538 Ok(None)
539 } else {
540 Ok(Some(connection))
541 }
542 }
543
544 async fn linkage(&self) -> Option<Vec<Linkage>> {
546 let linkage = self
547 .native
548 .linkage_table()
549 .iter()
550 .map(|(&runtime_id, upgrade_info)| Linkage {
551 original_id: runtime_id.into(),
552 upgraded_id: upgrade_info.upgraded_id.into(),
553 version: upgrade_info.upgraded_version.value().into(),
554 })
555 .collect();
556
557 Some(linkage)
558 }
559
560 async fn type_origins(&self) -> Option<Vec<TypeOrigin>> {
562 let type_origins = self
563 .native
564 .type_origin_table()
565 .iter()
566 .map(|origin| TypeOrigin {
567 module: origin.module_name.clone(),
568 struct_: origin.datatype_name.clone(),
569 defining_id: origin.package.into(),
570 })
571 .collect();
572
573 Some(type_origins)
574 }
575
576 async fn module_bcs(&self) -> Result<Option<Base64>> {
580 let bcs = bcs::to_bytes(self.native.serialized_module_map())
581 .map_err(|_| {
582 Error::Internal(format!("Failed to serialize package {}", self.native.id()))
583 })
584 .extend()?;
585
586 Ok(Some(bcs.into()))
587 }
588}
589
590impl MovePackage {
591 fn parsed_package(&self) -> Result<ParsedMovePackage, Error> {
592 ParsedMovePackage::read_from_package(&self.native)
593 .map_err(|e| Error::Internal(format!("Error reading package: {e}")))
594 }
595
596 fn checkpoint_viewed_at_impl(&self) -> u64 {
599 self.super_.checkpoint_viewed_at
600 }
601
602 pub(crate) fn module_impl(&self, name: &str) -> Result<Option<MoveModule>, Error> {
603 use PackageCacheError as E;
604 match (
605 self.native.serialized_module_map().get(name),
606 self.parsed_package()?.module(name),
607 ) {
608 (Some(native), Ok(parsed)) => Ok(Some(MoveModule {
609 storage_id: self.super_.address,
610 native: native.clone(),
611 parsed: parsed.clone(),
612 checkpoint_viewed_at: self.checkpoint_viewed_at_impl(),
613 })),
614
615 (None, _) | (_, Err(E::ModuleNotFound(_, _))) => Ok(None),
616 (_, Err(e)) => Err(Error::Internal(format!(
617 "Unexpected error fetching module: {e}"
618 ))),
619 }
620 }
621
622 pub(crate) fn by_id_at(checkpoint_viewed_at: u64) -> PackageLookup {
624 PackageLookup::ById {
625 checkpoint_viewed_at,
626 }
627 }
628
629 pub(crate) fn by_version(version: u64, checkpoint_viewed_at: u64) -> PackageLookup {
633 PackageLookup::Versioned {
634 version,
635 checkpoint_viewed_at,
636 }
637 }
638
639 pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> PackageLookup {
642 PackageLookup::Latest {
643 checkpoint_viewed_at,
644 }
645 }
646
647 pub(crate) async fn query(
648 ctx: &Context<'_>,
649 address: IotaAddress,
650 key: PackageLookup,
651 ) -> Result<Option<Self>, Error> {
652 let (address, key) = match key {
653 PackageLookup::ById {
654 checkpoint_viewed_at,
655 } => (address, Object::latest_at(checkpoint_viewed_at)),
656
657 PackageLookup::Versioned {
658 version,
659 checkpoint_viewed_at,
660 } => {
661 if is_system_package(address) {
662 (address, Object::at_version(version, checkpoint_viewed_at))
663 } else {
664 let DataLoader(loader) = &ctx.data_unchecked();
665 let Some(translation) = loader
666 .load_one(PackageVersionKey { address, version })
667 .await?
668 else {
669 return Ok(None);
670 };
671
672 (translation, Object::latest_at(checkpoint_viewed_at))
673 }
674 }
675
676 PackageLookup::Latest {
677 checkpoint_viewed_at,
678 } => {
679 if is_system_package(address) {
680 (address, Object::latest_at(checkpoint_viewed_at))
681 } else {
682 let DataLoader(loader) = &ctx.data_unchecked();
683 let Some(translation) = loader
684 .load_one(LatestKey {
685 address,
686 checkpoint_viewed_at,
687 })
688 .await?
689 else {
690 return Ok(None);
691 };
692
693 (translation, Object::latest_at(checkpoint_viewed_at))
694 }
695 }
696 };
697
698 let Some(object) = Object::query(ctx, address, key).await? else {
699 return Ok(None);
700 };
701
702 Ok(Some(MovePackage::try_from(&object).map_err(|_| {
703 Error::Internal(format!("{address} is not a package"))
704 })?))
705 }
706
707 pub(crate) async fn paginate_by_checkpoint(
722 db: &Db,
723 page: Page<Cursor>,
724 filter: Option<MovePackageCheckpointFilter>,
725 checkpoint_viewed_at: u64,
726 ) -> Result<Connection<String, MovePackage>, Error> {
727 let cursor_viewed_at = page.validate_cursor_consistency()?;
728 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
729
730 let after_checkpoint: Option<u64> = filter
731 .as_ref()
732 .and_then(|f| f.after_checkpoint)
733 .map(|v| v.into());
734
735 let before_checkpoint = filter
737 .as_ref()
738 .and_then(|f| f.before_checkpoint)
739 .map(|v| v.into())
740 .unwrap_or(u64::MAX)
741 .min(checkpoint_viewed_at + 1);
742
743 let (prev, next, results) = db
744 .execute(move |conn| {
745 let mut q = query!(
746 r#"
747 SELECT
748 p.original_id,
749 o.*
750 FROM
751 packages p
752 INNER JOIN
753 objects_history o
754 ON
755 p.package_id = o.object_id
756 AND p.package_version = o.object_version
757 AND p.checkpoint_sequence_number = o.checkpoint_sequence_number
758 "#
759 );
760
761 q = filter!(
762 q,
763 format!("o.checkpoint_sequence_number < {before_checkpoint}")
764 );
765 if let Some(after) = after_checkpoint {
766 q = filter!(q, format!("{after} < o.checkpoint_sequence_number"));
767 }
768
769 page.paginate_raw_query::<StoredHistoryPackage>(conn, checkpoint_viewed_at, q)
770 })
771 .await?;
772
773 let mut conn = Connection::new(prev, next);
774
775 for stored in results {
778 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
779 let package =
780 MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
781 conn.edges.push(Edge::new(cursor, package));
782 }
783
784 Ok(conn)
785 }
786
787 pub(crate) async fn paginate_by_version(
805 db: &Db,
806 page: Page<Cursor>,
807 package: IotaAddress,
808 filter: Option<MovePackageVersionFilter>,
809 checkpoint_viewed_at: u64,
810 ) -> Result<Connection<String, MovePackage>, Error> {
811 let cursor_viewed_at = page.validate_cursor_consistency()?;
812 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
813 let (prev, next, results) = db
814 .execute(move |conn| {
815 page.paginate_raw_query::<StoredHistoryPackage>(
816 conn,
817 checkpoint_viewed_at,
818 if is_system_package(package) {
819 system_package_version_query(package, filter)
820 } else {
821 user_package_version_query(package, filter)
822 },
823 )
824 })
825 .await?;
826
827 let mut conn = Connection::new(prev, next);
828
829 for stored in results {
832 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
833 let package =
834 MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
835 conn.edges.push(Edge::new(cursor, package));
836 }
837
838 Ok(conn)
839 }
840
841 pub(crate) fn try_from_stored_history_object(
846 history_object: StoredHistoryObject,
847 checkpoint_viewed_at: u64,
848 ) -> Result<Self, Error> {
849 let object = Object::try_from_stored_history_object(
850 history_object,
851 checkpoint_viewed_at,
852 None,
854 )?;
855 Self::try_from(&object).map_err(|_| Error::Internal("Not a package!".to_string()))
856 }
857}
858
859impl Checkpointed for Cursor {
860 fn checkpoint_viewed_at(&self) -> u64 {
861 self.checkpoint_viewed_at
862 }
863}
864
865impl RawPaginated<Cursor> for StoredHistoryPackage {
866 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
867 filter!(
868 query,
869 format!(
870 "o.checkpoint_sequence_number > {cp} OR (\
871 o.checkpoint_sequence_number = {cp} AND
872 original_id > '\\x{id}'::bytea OR (\
873 original_id = '\\x{id}'::bytea AND \
874 o.object_version >= {pv}\
875 ))",
876 cp = cursor.checkpoint_sequence_number,
877 id = hex::encode(&cursor.original_id),
878 pv = cursor.package_version,
879 )
880 )
881 }
882
883 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
884 filter!(
885 query,
886 format!(
887 "o.checkpoint_sequence_number < {cp} OR (\
888 o.checkpoint_sequence_number = {cp} AND
889 original_id < '\\x{id}'::bytea OR (\
890 original_id = '\\x{id}'::bytea AND \
891 o.object_version <= {pv}\
892 ))",
893 cp = cursor.checkpoint_sequence_number,
894 id = hex::encode(&cursor.original_id),
895 pv = cursor.package_version,
896 )
897 )
898 }
899
900 fn order(asc: bool, query: RawQuery) -> RawQuery {
901 if asc {
902 query
903 .order_by("o.checkpoint_sequence_number ASC")
904 .order_by("original_id ASC")
905 .order_by("o.object_version ASC")
906 } else {
907 query
908 .order_by("o.checkpoint_sequence_number DESC")
909 .order_by("original_id DESC")
910 .order_by("o.object_version DESC")
911 }
912 }
913}
914
915impl Target<Cursor> for StoredHistoryPackage {
916 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
917 Cursor::new(PackageCursor {
918 checkpoint_sequence_number: self.object.checkpoint_sequence_number as u64,
919 original_id: self.original_id.clone(),
920 package_version: self.object.object_version as u64,
921 checkpoint_viewed_at,
922 })
923 }
924}
925
926impl ScanLimited for BcsCursor<PackageCursor> {}
927
928impl Loader<PackageVersionKey> for Db {
929 type Value = IotaAddress;
930 type Error = Error;
931
932 async fn load(
933 &self,
934 keys: &[PackageVersionKey],
935 ) -> Result<HashMap<PackageVersionKey, IotaAddress>, Error> {
936 use packages::dsl;
937 let other = diesel::alias!(packages as other);
938
939 let id_versions: BTreeSet<_> = keys
940 .iter()
941 .map(|k| (k.address.into_vec(), k.version as i64))
942 .collect();
943
944 let stored_packages: Vec<(Vec<u8>, i64, Vec<u8>)> = self
945 .execute(move |conn| {
946 conn.results(|| {
947 let mut query = dsl::packages
948 .inner_join(other.on(dsl::original_id.eq(other.field(dsl::original_id))))
949 .select((
950 dsl::package_id,
951 other.field(dsl::package_version),
952 other.field(dsl::package_id),
953 ))
954 .into_boxed();
955
956 for (id, version) in id_versions.iter().cloned() {
957 query = query.or_filter(
958 dsl::package_id
959 .eq(id)
960 .and(other.field(dsl::package_version).eq(version)),
961 );
962 }
963
964 query
965 })
966 })
967 .await
968 .map_err(|e| Error::Internal(format!("Failed to load packages: {e}")))?;
969
970 let mut result = HashMap::new();
971 for (id, version, other_id) in stored_packages {
972 result.insert(
973 PackageVersionKey {
974 address: addr(&id)?,
975 version: version as u64,
976 },
977 addr(&other_id)?,
978 );
979 }
980
981 Ok(result)
982 }
983}
984
985impl Loader<LatestKey> for Db {
986 type Value = IotaAddress;
987 type Error = Error;
988
989 async fn load(&self, keys: &[LatestKey]) -> Result<HashMap<LatestKey, IotaAddress>, Error> {
990 use packages::dsl;
991 let other = diesel::alias!(packages as other);
992
993 let mut ids_by_cursor: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
994 for key in keys {
995 ids_by_cursor
996 .entry(key.checkpoint_viewed_at)
997 .or_default()
998 .insert(key.address.into_vec());
999 }
1000
1001 let futures = ids_by_cursor
1003 .into_iter()
1004 .map(|(checkpoint_viewed_at, ids)| {
1005 self.execute(move |conn| {
1006 let results: Vec<(Vec<u8>, Vec<u8>)> = conn.results(|| {
1007 let o_original_id = other.field(dsl::original_id);
1008 let o_package_id = other.field(dsl::package_id);
1009 let o_cp_seq_num = other.field(dsl::checkpoint_sequence_number);
1010 let o_version = other.field(dsl::package_version);
1011
1012 let query = dsl::packages
1013 .inner_join(other.on(dsl::original_id.eq(o_original_id)))
1014 .select((dsl::package_id, o_package_id))
1015 .filter(dsl::package_id.eq_any(ids.iter().cloned()))
1016 .filter(o_cp_seq_num.le(checkpoint_viewed_at as i64))
1017 .order_by((dsl::package_id, dsl::original_id, o_version.desc()))
1018 .distinct_on((dsl::package_id, dsl::original_id));
1019 query
1020 })?;
1021
1022 Ok::<_, diesel::result::Error>(
1023 results
1024 .into_iter()
1025 .map(|(p, latest)| (checkpoint_viewed_at, p, latest))
1026 .collect::<Vec<_>>(),
1027 )
1028 })
1029 });
1030
1031 let groups = futures::future::join_all(futures).await;
1033
1034 let mut results = HashMap::new();
1035 for group in groups {
1036 for (checkpoint_viewed_at, address, latest) in
1037 group.map_err(|e| Error::Internal(format!("Failed to fetch packages: {e}")))?
1038 {
1039 results.insert(
1040 LatestKey {
1041 address: addr(&address)?,
1042 checkpoint_viewed_at,
1043 },
1044 addr(&latest)?,
1045 );
1046 }
1047 }
1048
1049 Ok(results)
1050 }
1051}
1052
1053impl TryFrom<&Object> for MovePackage {
1054 type Error = MovePackageDowncastError;
1055
1056 fn try_from(object: &Object) -> Result<Self, MovePackageDowncastError> {
1057 let Some(native) = object.native_impl() else {
1058 return Err(MovePackageDowncastError);
1059 };
1060
1061 if let Data::Package(move_package) = &native.data {
1062 Ok(Self {
1063 super_: object.clone(),
1064 native: move_package.clone(),
1065 })
1066 } else {
1067 Err(MovePackageDowncastError)
1068 }
1069 }
1070}
1071
1072fn system_package_version_query(
1076 package: IotaAddress,
1077 filter: Option<MovePackageVersionFilter>,
1078) -> RawQuery {
1079 let mut q = query!(
1082 r#"
1083 SELECT
1084 o.object_id AS original_id,
1085 o.*
1086 FROM
1087 objects_version v
1088 LEFT JOIN
1089 objects_history o
1090 ON
1091 v.object_id = o.object_id
1092 AND v.object_version = o.object_version
1093 AND v.cp_sequence_number = o.checkpoint_sequence_number
1094 "#
1095 );
1096
1097 q = filter!(
1098 q,
1099 format!(
1100 "v.object_id = '\\x{}'::bytea",
1101 hex::encode(package.into_vec())
1102 )
1103 );
1104
1105 if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1106 let a: u64 = after.into();
1107 q = filter!(q, format!("v.object_version > {a}"));
1108 }
1109
1110 if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1111 let b: u64 = before.into();
1112 q = filter!(q, format!("v.object_version < {b}"));
1113 }
1114
1115 q
1116}
1117
1118fn user_package_version_query(
1121 package: IotaAddress,
1122 filter: Option<MovePackageVersionFilter>,
1123) -> RawQuery {
1124 let mut q = query!(
1125 r#"
1126 SELECT
1127 p.original_id,
1128 o.*
1129 FROM
1130 packages q
1131 INNER JOIN
1132 packages p
1133 ON
1134 q.original_id = p.original_id
1135 INNER JOIN
1136 objects_history o
1137 ON
1138 p.package_id = o.object_id
1139 AND p.package_version = o.object_version
1140 AND p.checkpoint_sequence_number = o.checkpoint_sequence_number
1141 "#
1142 );
1143
1144 q = filter!(
1145 q,
1146 format!(
1147 "q.package_id = '\\x{}'::bytea",
1148 hex::encode(package.into_vec())
1149 )
1150 );
1151
1152 if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1153 let a: u64 = after.into();
1154 q = filter!(q, format!("p.package_version > {a}"));
1155 }
1156
1157 if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1158 let b: u64 = before.into();
1159 q = filter!(q, format!("p.package_version < {b}"));
1160 }
1161
1162 q
1163}