1use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11 GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14 DelegatedStake, DelegatedTimelockedStake, IotaCommittee,
15 IotaSystemStateSummary as IotaSystemStateSummarySchema,
16 IotaSystemStateSummaryV1 as IotaSystemStateSummaryV1Schema, Stake, StakeStatus,
17 TimelockedStake, ValidatorApy, ValidatorApys,
18};
19use iota_metrics::spawn_monitored_task;
20use iota_open_rpc::Module;
21use iota_sdk_types::{Address, ObjectId};
22use iota_types::{
23 MoveTypeTagTrait,
24 committee::EpochId,
25 dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
26 error::{IotaError, UserInputError},
27 governance::StakedIota,
28 id::ID,
29 iota_serde::BigInt,
30 iota_system_state::{
31 IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
32 iota_system_state_summary::{IotaSystemStateSummaryV1, IotaSystemStateSummaryV2},
33 },
34 object::{Object, ObjectRead},
35 timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44 IotaRpcModule, ObjectProvider,
45 authority_state::StateRead,
46 error::{Error, IotaRpcInputError, RpcInterimResult},
47 logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (Address, ObjectId, ObjectId, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54 state: Arc<dyn StateRead>,
55 pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59 pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60 Self { state, metrics }
61 }
62
63 async fn get_staked_iota(&self, owner: Address) -> Result<Vec<StakedIota>, Error> {
64 let state = self.state.clone();
65 let result =
66 spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68 self.metrics
69 .get_stake_iota_result_size
70 .observe(result.len() as f64);
71 self.metrics
72 .get_stake_iota_result_size_total
73 .inc_by(result.len() as u64);
74 Ok(result)
75 }
76
77 async fn get_timelocked_staked_iota(
78 &self,
79 owner: Address,
80 ) -> Result<Vec<TimelockedStakedIota>, Error> {
81 let state = self.state.clone();
82 let result =
83 spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84 .await??;
85
86 self.metrics
87 .get_stake_iota_result_size
88 .observe(result.len() as f64);
89 self.metrics
90 .get_stake_iota_result_size_total
91 .inc_by(result.len() as u64);
92 Ok(result)
93 }
94
95 async fn get_stakes_by_ids(
96 &self,
97 staked_iota_ids: Vec<ObjectId>,
98 ) -> Result<Vec<DelegatedStake>, Error> {
99 let state = self.state.clone();
100 let stakes_read = spawn_monitored_task!(async move {
101 staked_iota_ids
102 .iter()
103 .map(|id| state.get_object_read(id))
104 .collect::<Result<Vec<_>, _>>()
105 })
106 .await??;
107
108 if stakes_read.is_empty() {
109 return Ok(vec![]);
110 }
111
112 let stakes: Vec<(StakedIota, bool)> = self
113 .stakes_with_status(stakes_read.into_iter())
114 .await?
115 .into_iter()
116 .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117 .collect::<Result<_, _>>()?;
118
119 self.get_delegated_stakes(stakes).await
120 }
121
122 async fn get_stakes(&self, owner: Address) -> Result<Vec<DelegatedStake>, Error> {
123 let timer = self.metrics.get_stake_iota_latency.start_timer();
124 let stakes = self.get_staked_iota(owner).await?;
125 if stakes.is_empty() {
126 return Ok(vec![]);
127 }
128 drop(timer);
129
130 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132 let self_clone = self.clone();
133 spawn_monitored_task!(
134 self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135 )
136 .await?
137 }
138
139 async fn get_timelocked_stakes_by_ids(
140 &self,
141 timelocked_staked_iota_ids: Vec<ObjectId>,
142 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143 let state = self.state.clone();
144 let stakes_read = spawn_monitored_task!(async move {
145 timelocked_staked_iota_ids
146 .iter()
147 .map(|id| state.get_object_read(id))
148 .collect::<Result<Vec<_>, _>>()
149 })
150 .await??;
151
152 if stakes_read.is_empty() {
153 return Ok(vec![]);
154 }
155
156 let stakes: Vec<(TimelockedStakedIota, bool)> = self
157 .stakes_with_status(stakes_read.into_iter())
158 .await?
159 .into_iter()
160 .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161 .collect::<Result<_, _>>()?;
162
163 self.get_delegated_timelocked_stakes(stakes).await
164 }
165
166 async fn get_timelocked_stakes(
167 &self,
168 owner: Address,
169 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170 let timer = self.metrics.get_stake_iota_latency.start_timer();
171 let stakes = self.get_timelocked_staked_iota(owner).await?;
172 if stakes.is_empty() {
173 return Ok(vec![]);
174 }
175 drop(timer);
176
177 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179 let self_clone = self.clone();
180 spawn_monitored_task!(
181 self_clone
182 .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183 )
184 .await?
185 }
186
187 async fn get_delegated_stakes(
188 &self,
189 stakes: Vec<(StakedIota, bool)>,
190 ) -> Result<Vec<DelegatedStake>, Error> {
191 let pools = stakes.into_iter().fold(
192 BTreeMap::<_, Vec<_>>::new(),
193 |mut pools, (stake, exists)| {
194 pools
195 .entry(stake.pool_id())
196 .or_default()
197 .push((stake, exists));
198 pools
199 },
200 );
201
202 let system_state = self.get_system_state()?;
203 let system_state_summary = IotaSystemStateSummaryV2::try_from(
204 system_state.clone().into_iota_system_state_summary(),
205 )?;
206
207 let rates = exchange_rates(&self.state, system_state_summary.epoch)
208 .await?
209 .into_iter()
210 .chain(candidate_validators_exchange_rate(&self.state)?)
212 .chain(pending_validators_exchange_rate(&self.state)?)
214 .map(|rates| (rates.pool_id, rates))
215 .collect::<BTreeMap<_, _>>();
216
217 let mut delegated_stakes = vec![];
218 for (pool_id, stakes) in pools {
219 let rate_table = rates.get(&pool_id).ok_or_else(|| {
221 IotaRpcInputError::GenericNotFound(format!(
222 "Cannot find rates for staking pool {pool_id}"
223 ))
224 })?;
225 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227 let mut delegations = vec![];
228 for (stake, exists) in stakes {
229 let status = stake_status(
230 system_state_summary.epoch,
231 stake.activation_epoch(),
232 stake.principal(),
233 exists,
234 current_rate,
235 rate_table,
236 );
237 delegations.push(Stake {
238 staked_iota_id: stake.id(),
239 stake_request_epoch: stake.activation_epoch() - 1,
241 stake_active_epoch: stake.activation_epoch(),
242 principal: stake.principal(),
243 status,
244 })
245 }
246 delegated_stakes.push(DelegatedStake {
247 validator_address: rate_table.address,
248 staking_pool: pool_id,
249 stakes: delegations,
250 })
251 }
252 Ok(delegated_stakes)
253 }
254
255 async fn get_delegated_timelocked_stakes(
256 &self,
257 stakes: Vec<(TimelockedStakedIota, bool)>,
258 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259 let pools = stakes.into_iter().fold(
260 BTreeMap::<_, Vec<_>>::new(),
261 |mut pools, (stake, exists)| {
262 pools
263 .entry(stake.pool_id())
264 .or_default()
265 .push((stake, exists));
266 pools
267 },
268 );
269
270 let system_state = self.get_system_state()?;
271 let system_state_summary = IotaSystemStateSummaryV2::try_from(
272 system_state.clone().into_iota_system_state_summary(),
273 )?;
274
275 let rates = exchange_rates(&self.state, system_state_summary.epoch)
276 .await?
277 .into_iter()
278 .chain(candidate_validators_exchange_rate(&self.state)?)
280 .chain(pending_validators_exchange_rate(&self.state)?)
282 .map(|rates| (rates.pool_id, rates))
283 .collect::<BTreeMap<_, _>>();
284
285 let mut delegated_stakes = vec![];
286 for (pool_id, stakes) in pools {
287 let rate_table = rates.get(&pool_id).ok_or_else(|| {
289 IotaRpcInputError::GenericNotFound(format!(
290 "Cannot find rates for staking pool {pool_id}"
291 ))
292 })?;
293 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295 let mut delegations = vec![];
296 for (stake, exists) in stakes {
297 let status = stake_status(
298 system_state_summary.epoch,
299 stake.activation_epoch(),
300 stake.principal(),
301 exists,
302 current_rate,
303 rate_table,
304 );
305 delegations.push(TimelockedStake {
306 timelocked_staked_iota_id: stake.id(),
307 stake_request_epoch: stake.activation_epoch() - 1,
309 stake_active_epoch: stake.activation_epoch(),
310 principal: stake.principal(),
311 status,
312 expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313 label: stake.label().clone(),
314 })
315 }
316 delegated_stakes.push(DelegatedTimelockedStake {
317 validator_address: rate_table.address,
318 staking_pool: pool_id,
319 stakes: delegations,
320 })
321 }
322 Ok(delegated_stakes)
323 }
324
325 async fn stakes_with_status(
326 &self,
327 iter: impl Iterator<Item = ObjectRead>,
328 ) -> Result<Vec<(Object, bool)>, Error> {
329 let mut stakes = vec![];
330
331 for stake in iter {
332 match stake {
333 ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334 ObjectRead::Deleted(object_ref) => {
335 let Some(o) = self
336 .state
337 .find_object_lt_or_eq_version(
338 &object_ref.object_id,
339 &object_ref.version.previous().unwrap(),
340 )
341 .await?
342 else {
343 Err(IotaRpcInputError::UserInput(
344 UserInputError::ObjectNotFound {
345 object_id: object_ref.object_id,
346 version: None,
347 },
348 ))?
349 };
350 stakes.push((o, false));
351 }
352 ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
353 UserInputError::ObjectNotFound {
354 object_id: id,
355 version: None,
356 },
357 ))?,
358 }
359 }
360
361 Ok(stakes)
362 }
363
364 fn get_system_state(&self) -> Result<IotaSystemState, Error> {
365 Ok(self.state.get_system_state()?)
366 }
367}
368
369#[async_trait]
370impl GovernanceReadApiServer for GovernanceReadApi {
371 #[instrument(skip(self, staked_iota_ids), fields(staked_iota_ids = staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
372 async fn get_stakes_by_ids(
373 &self,
374 staked_iota_ids: Vec<ObjectId>,
375 ) -> RpcResult<Vec<DelegatedStake>> {
376 self.get_stakes_by_ids(staked_iota_ids).trace().await
377 }
378
379 #[instrument(skip(self, owner), fields(owner = %owner))]
380 async fn get_stakes(&self, owner: Address) -> RpcResult<Vec<DelegatedStake>> {
381 self.get_stakes(owner).trace().await
382 }
383
384 #[instrument(skip(self, timelocked_staked_iota_ids), fields(timelocked_staked_iota_ids = timelocked_staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
385 async fn get_timelocked_stakes_by_ids(
386 &self,
387 timelocked_staked_iota_ids: Vec<ObjectId>,
388 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
389 self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
390 .trace()
391 .await
392 }
393
394 #[instrument(skip(self, owner), fields(owner = %owner))]
395 async fn get_timelocked_stakes(
396 &self,
397 owner: Address,
398 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
399 self.get_timelocked_stakes(owner).trace().await
400 }
401
402 #[instrument(skip(self))]
403 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
404 async move {
405 self.state
406 .get_or_latest_committee(epoch)
407 .map(|committee| committee.into())
408 .map_err(Error::from)
409 }
410 .trace()
411 .await
412 }
413
414 #[instrument(skip(self))]
415 async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummarySchema> {
416 async move {
417 Ok(self
418 .state
419 .get_system_state()?
420 .into_iota_system_state_summary()
421 .into())
422 }
423 .trace()
424 .await
425 }
426
427 #[instrument(skip(self))]
428 async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1Schema> {
429 async move {
430 Ok(IotaSystemStateSummaryV1::try_from(
431 self.state
432 .get_system_state()?
433 .into_iota_system_state_summary(),
434 )?
435 .into())
436 }
437 .trace()
438 .await
439 }
440
441 #[instrument(skip(self))]
442 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
443 async move {
444 let epoch_store = self.state.load_epoch_store_one_call_per_task();
445 Ok(epoch_store.reference_gas_price().into())
446 }
447 .trace()
448 .await
449 }
450
451 #[instrument(skip(self))]
452 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
453 info!("get_validator_apy");
454 let system_state_summary = self.get_latest_iota_system_state().await?;
455
456 let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
457 .await
458 .map_err(|e| error_object_from_rpc(e.into()))?;
459
460 let apys = calculate_apys(exchange_rate_table);
461
462 Ok(ValidatorApys {
463 apys,
464 epoch: system_state_summary.epoch,
465 })
466 }
467}
468
469pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
470 let mut apys = vec![];
471
472 for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
473 let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
474
475 let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
476 apys.push(ValidatorApy {
477 address: rates.address,
478 apy: mean_apy,
479 });
480 }
481 apys
482}
483
484pub fn mean_apy_from_exchange_rates<'er>(
492 exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
493) -> f64 {
494 const MAX_VALID_APY: f64 = 1.00;
496 const SAMPLES: usize = 7;
497
498 let rates = exchange_rates.clone().dropping(1);
499 let rates_next = exchange_rates.dropping_back(1);
500
501 let mut apys = rates
502 .zip(rates_next)
503 .take(SAMPLES + 1)
504 .map(|(er, er_next)| calculate_apy(er, er_next))
505 .collect::<Vec<_>>();
506
507 if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
509 return 0.0;
510 }
511 let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
515 || apys.iter().any(|&apy| apy > MAX_VALID_APY);
516
517 apys.truncate(SAMPLES);
518
519 if has_outlier {
520 Data::new(apys).median()
521 } else {
522 let sum: f64 = apys.iter().sum();
523 sum / SAMPLES as f64
524 }
525}
526
527const APY_DUST_THRESHOLD: f64 = 1e-9;
529
530fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
535 let apy = ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0;
536 if apy.abs() < APY_DUST_THRESHOLD {
537 0.0
538 } else {
539 apy
540 }
541}
542
543fn stake_status(
544 epoch: u64,
545 activation_epoch: u64,
546 principal: u64,
547 exists: bool,
548 current_rate: Option<&PoolTokenExchangeRate>,
549 rate_table: &ValidatorExchangeRates,
550) -> StakeStatus {
551 if !exists {
552 StakeStatus::Unstaked
553 } else if epoch >= activation_epoch {
554 let estimated_reward = if let Some(current_rate) = current_rate {
556 let stake_rate = rate_table
557 .rates
558 .iter()
559 .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
560 .unwrap_or_default();
561 let estimated_reward =
562 ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
563 max(0, estimated_reward.round() as u64)
564 } else {
565 0
566 };
567 StakeStatus::Active { estimated_reward }
568 } else {
569 StakeStatus::Pending
570 }
571}
572
573#[cached(
577 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
578 create = "{ SizedCache::with_size(1) }",
579 convert = "{ _current_epoch }",
580 result = true
581)]
582async fn exchange_rates(
583 state: &Arc<dyn StateRead>,
584 _current_epoch: EpochId,
585) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
586 Ok(active_validators_exchange_rates(state)?
587 .into_iter()
588 .chain(inactive_validators_exchange_rates(state)?.into_iter())
589 .collect())
590}
591
592#[cfg(msim)]
600pub async fn clear_exchange_rates_cache_for_testing() {
601 use cached::Cached;
602 if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
603 let mut cache = mutex.lock().await;
604 cache.cache_clear();
605 }
606}
607
608fn validator_exchange_rates(
610 state: &Arc<dyn StateRead>,
611 tables: Vec<ValidatorTable>,
612) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
613 if tables.is_empty() {
614 return Ok(vec![]);
615 };
616
617 let mut exchange_rates = vec![];
618 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
620 let mut rates = state
621 .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
622 .into_iter()
623 .map(|(_object_id, df)| {
624 let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
625 IotaError::ObjectDeserialization {
626 error: e.to_string(),
627 }
628 })?;
629
630 let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
631 &state.get_object_store().as_ref(),
632 exchange_rates_id,
633 &epoch,
634 )?;
635
636 Ok::<_, IotaError>((epoch, exchange_rate))
637 })
638 .collect::<Result<Vec<_>, _>>()?;
639
640 rates = backfill_rates(rates);
643
644 exchange_rates.push(ValidatorExchangeRates {
645 address,
646 pool_id,
647 active,
648 rates,
649 });
650 }
651
652 Ok(exchange_rates)
653}
654
655fn active_validators_exchange_rates(
657 state: &Arc<dyn StateRead>,
658) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
659 let system_state_summary = IotaSystemStateSummaryV2::try_from(
660 state.get_system_state()?.into_iota_system_state_summary(),
661 )?;
662
663 let tables = system_state_summary
664 .active_validators
665 .into_iter()
666 .map(|validator| {
667 (
668 validator.iota_address,
669 validator.staking_pool_id,
670 validator.exchange_rates_id,
671 validator.exchange_rates_size,
672 true,
673 )
674 })
675 .collect();
676
677 validator_exchange_rates(state, tables)
678}
679
680fn inactive_validators_exchange_rates(
682 state: &Arc<dyn StateRead>,
683) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
684 let system_state_summary = IotaSystemStateSummaryV2::try_from(
685 state.get_system_state()?.into_iota_system_state_summary(),
686 )?;
687
688 let tables = validator_summary_from_system_state(
689 state,
690 system_state_summary.inactive_pools_id,
691 system_state_summary.inactive_pools_size,
692 |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
693 Some(system_state_summary.protocol_version),
694 )?;
695
696 validator_exchange_rates(state, tables)
697}
698
699fn pending_validators_exchange_rate(
705 state: &Arc<dyn StateRead>,
706) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
707 let system_state = state.get_system_state()?;
708 let object_store = state.get_object_store();
709
710 let tables = system_state
712 .get_pending_active_validators(object_store)?
713 .into_iter()
714 .map(|pending_active_validator| {
715 (
716 pending_active_validator.iota_address,
717 pending_active_validator.staking_pool_id,
718 pending_active_validator.exchange_rates_id,
719 pending_active_validator.exchange_rates_size,
720 false,
721 )
722 })
723 .collect::<Vec<ValidatorTable>>();
724
725 validator_exchange_rates(state, tables)
726}
727
728fn candidate_validators_exchange_rate(
734 state: &Arc<dyn StateRead>,
735) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
736 let system_state_summary = IotaSystemStateSummaryV2::try_from(
737 state.get_system_state()?.into_iota_system_state_summary(),
738 )?;
739
740 let tables = validator_summary_from_system_state(
743 state,
744 system_state_summary.validator_candidates_id,
745 system_state_summary.validator_candidates_size,
746 |df| bcs::from_bytes::<Address>(&df.bcs_name).map_err(Into::into),
747 Some(system_state_summary.protocol_version),
748 )?;
749
750 validator_exchange_rates(state, tables)
751}
752
753fn validator_summary_from_system_state<K, F>(
800 state: &Arc<dyn StateRead>,
801 table_id: ObjectId,
802 limit: u64,
803 key: F,
804 protocol_version: Option<u64>,
805) -> RpcInterimResult<Vec<ValidatorTable>>
806where
807 F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
808 K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
809{
810 let object_store = state.get_object_store();
811
812 state
813 .get_dynamic_fields(table_id, None, limit as usize)?
814 .into_iter()
815 .map(|(_object_id, df)| {
816 let validator_summary =
817 get_validator_from_table(object_store, table_id, &key(df)?, protocol_version)?;
818
819 Ok((
820 validator_summary.iota_address,
821 validator_summary.staking_pool_id,
822 validator_summary.exchange_rates_id,
823 validator_summary.exchange_rates_size,
824 false,
825 ))
826 })
827 .collect()
828}
829
830#[derive(Clone, Debug)]
831pub struct ValidatorExchangeRates {
832 pub address: Address,
833 pub pool_id: ObjectId,
834 pub active: bool,
835 pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
836}
837
838fn backfill_rates(
842 mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
843) -> Vec<(EpochId, PoolTokenExchangeRate)> {
844 if rates.is_empty() {
845 return rates;
846 }
847 rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
849
850 let (min_epoch, _) = rates.first().expect("rates should not be empty");
852 let (max_epoch, _) = rates.last().expect("rates should not be empty");
853 let expected_len = (max_epoch - min_epoch + 1) as usize;
854 let current_len = rates.len();
855
856 if current_len == expected_len {
858 rates.reverse();
859 return rates;
860 }
861
862 let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
863 let mut missing_rates = Vec::with_capacity(expected_len - current_len);
864 for (epoch_id, rate) in rates {
865 if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
867 for missing_epoch_id in prev_epoch_id + 1..epoch_id {
868 missing_rates.push((missing_epoch_id, prev_rate.clone()));
869 }
870 };
871
872 filled_rates.append(&mut missing_rates);
877 filled_rates.push((epoch_id, rate));
878 }
879 filled_rates.reverse();
880 filled_rates
881}
882
883impl IotaRpcModule for GovernanceReadApi {
884 fn rpc(self) -> RpcModule<Self> {
885 self.into_rpc()
886 }
887
888 fn rpc_doc_module() -> Module {
889 GovernanceReadApiOpenRpc::module_doc()
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use iota_types::iota_system_state::PoolTokenExchangeRate;
896
897 use super::*;
898
899 #[test]
900 fn calculate_apys_with_outliers() {
901 let file =
902 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
903 .unwrap();
904 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
905 serde_json::from_reader(file).unwrap();
906
907 let mut address_map = BTreeMap::new();
908
909 let exchange_rates = rates
910 .into_iter()
911 .map(|(validator, rates_vec)| {
912 let address = Address::random();
913 address_map.insert(address, validator);
914 ValidatorExchangeRates {
915 address,
916 pool_id: ObjectId::random(),
917 active: true,
918 rates: backfill_rates(rates_vec),
919 }
920 })
921 .collect();
922
923 let apys = calculate_apys(exchange_rates);
924
925 for apy in &apys {
926 println!("{}: {}", address_map[&apy.address], apy.apy);
927 assert!(apy.apy < 0.15)
928 }
929 }
930
931 #[test]
932 fn calculate_apys_without_outliers() {
933 let file =
934 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
935 .unwrap();
936 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
937 serde_json::from_reader(file).unwrap();
938
939 let mut address_map = BTreeMap::new();
940
941 let exchange_rates = rates
942 .into_iter()
943 .map(|(validator, rates_vec)| {
944 let address = Address::random();
945 address_map.insert(address, validator);
946 ValidatorExchangeRates {
947 address,
948 pool_id: ObjectId::random(),
949 active: true,
950 rates: backfill_rates(rates_vec),
951 }
952 })
953 .collect();
954
955 let apys = calculate_apys(exchange_rates);
956
957 for apy in &apys {
958 println!("{}: {}", address_map[&apy.address], apy.apy);
959 assert!(apy.apy < 0.15)
960 }
961 }
962
963 #[test]
964 fn calculate_apy_is_not_negative_for_zero_reward_epoch() {
965 let cases = [
971 (
972 (48_913_429_030_426_080u64, 43_331_127_650_932_384u64),
973 (48_641_042_011_532_656u64, 43_089_827_114_043_304u64),
974 ),
975 (
976 (33_370_417_056_337_732u64, 29_578_114_234_284_444u64),
977 (33_370_374_157_145_896u64, 29_578_076_210_270_704u64),
978 ),
979 ];
980
981 for ((i_old, t_old), (i_new, t_new)) in cases {
982 let er = PoolTokenExchangeRate::new_for_testing(i_old, t_old);
983 let er_next = PoolTokenExchangeRate::new_for_testing(i_new, t_new);
984 let apy = calculate_apy(&er, &er_next);
985 assert!(
986 (0.0..APY_DUST_THRESHOLD).contains(&apy),
987 "expected an effectively-zero, non-negative APY, got {apy}"
988 );
989 }
990 }
991
992 #[test]
993 fn test_backfill_rates_empty() {
994 let rates = vec![];
995 assert_eq!(backfill_rates(rates), vec![]);
996 }
997
998 #[test]
999 fn test_backfill_rates_no_gaps() {
1000 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1001 let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
1002 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1003 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
1004
1005 let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
1006 assert_eq!(backfill_rates(rates), expected);
1007 }
1008
1009 #[test]
1010 fn test_backfill_single_rate() {
1011 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1012 let rates = vec![(1, rate1.clone())];
1013 let expected = vec![(1, rate1)];
1014 assert_eq!(backfill_rates(rates), expected);
1015 }
1016
1017 #[test]
1018 fn test_backfill_rates_with_gaps() {
1019 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1020 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1021 let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
1022 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
1023
1024 let expected = vec![
1025 (5, rate5),
1026 (4, rate3.clone()),
1027 (3, rate3),
1028 (2, rate1.clone()),
1029 (1, rate1),
1030 ];
1031 assert_eq!(backfill_rates(rates), expected);
1032 }
1033
1034 #[test]
1035 fn test_backfill_rates_missing_middle_epoch() {
1036 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1037 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1038 let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
1039 let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
1040 assert_eq!(backfill_rates(rates), expected);
1041 }
1042
1043 #[test]
1044 fn test_backfill_rates_missing_middle_epochs() {
1045 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1046 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1047 let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1048 let expected = vec![
1049 (4, rate4),
1050 (3, rate1.clone()),
1051 (2, rate1.clone()),
1052 (1, rate1),
1053 ];
1054 assert_eq!(backfill_rates(rates), expected);
1055 }
1056
1057 #[test]
1058 fn test_backfill_rates_unordered_input() {
1059 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1060 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1061 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1062 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1063 let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1064 assert_eq!(backfill_rates(rates), expected);
1065 }
1066}