1use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11 GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14 DelegatedStake, DelegatedTimelockedStake, IotaCommittee, Stake, StakeStatus, TimelockedStake,
15 ValidatorApy, ValidatorApys,
16};
17use iota_metrics::spawn_monitored_task;
18use iota_open_rpc::Module;
19use iota_types::{
20 MoveTypeTagTrait,
21 base_types::{IotaAddress, ObjectID},
22 committee::EpochId,
23 dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
24 error::{IotaError, UserInputError},
25 governance::StakedIota,
26 id::ID,
27 iota_serde::BigInt,
28 iota_system_state::{
29 IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
30 iota_system_state_summary::{
31 IotaSystemStateSummary, IotaSystemStateSummaryV1, IotaSystemStateSummaryV2,
32 },
33 },
34 object::{Object, ObjectRead},
35 timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44 IotaRpcModule, ObjectProvider,
45 authority_state::StateRead,
46 error::{Error, IotaRpcInputError, RpcInterimResult},
47 logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54 state: Arc<dyn StateRead>,
55 pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59 pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60 Self { state, metrics }
61 }
62
63 async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64 let state = self.state.clone();
65 let result =
66 spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68 self.metrics
69 .get_stake_iota_result_size
70 .observe(result.len() as f64);
71 self.metrics
72 .get_stake_iota_result_size_total
73 .inc_by(result.len() as u64);
74 Ok(result)
75 }
76
77 async fn get_timelocked_staked_iota(
78 &self,
79 owner: IotaAddress,
80 ) -> Result<Vec<TimelockedStakedIota>, Error> {
81 let state = self.state.clone();
82 let result =
83 spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84 .await??;
85
86 self.metrics
87 .get_stake_iota_result_size
88 .observe(result.len() as f64);
89 self.metrics
90 .get_stake_iota_result_size_total
91 .inc_by(result.len() as u64);
92 Ok(result)
93 }
94
95 async fn get_stakes_by_ids(
96 &self,
97 staked_iota_ids: Vec<ObjectID>,
98 ) -> Result<Vec<DelegatedStake>, Error> {
99 let state = self.state.clone();
100 let stakes_read = spawn_monitored_task!(async move {
101 staked_iota_ids
102 .iter()
103 .map(|id| state.get_object_read(id))
104 .collect::<Result<Vec<_>, _>>()
105 })
106 .await??;
107
108 if stakes_read.is_empty() {
109 return Ok(vec![]);
110 }
111
112 let stakes: Vec<(StakedIota, bool)> = self
113 .stakes_with_status(stakes_read.into_iter())
114 .await?
115 .into_iter()
116 .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117 .collect::<Result<_, _>>()?;
118
119 self.get_delegated_stakes(stakes).await
120 }
121
122 async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123 let timer = self.metrics.get_stake_iota_latency.start_timer();
124 let stakes = self.get_staked_iota(owner).await?;
125 if stakes.is_empty() {
126 return Ok(vec![]);
127 }
128 drop(timer);
129
130 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132 let self_clone = self.clone();
133 spawn_monitored_task!(
134 self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135 )
136 .await?
137 }
138
139 async fn get_timelocked_stakes_by_ids(
140 &self,
141 timelocked_staked_iota_ids: Vec<ObjectID>,
142 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143 let state = self.state.clone();
144 let stakes_read = spawn_monitored_task!(async move {
145 timelocked_staked_iota_ids
146 .iter()
147 .map(|id| state.get_object_read(id))
148 .collect::<Result<Vec<_>, _>>()
149 })
150 .await??;
151
152 if stakes_read.is_empty() {
153 return Ok(vec![]);
154 }
155
156 let stakes: Vec<(TimelockedStakedIota, bool)> = self
157 .stakes_with_status(stakes_read.into_iter())
158 .await?
159 .into_iter()
160 .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161 .collect::<Result<_, _>>()?;
162
163 self.get_delegated_timelocked_stakes(stakes).await
164 }
165
166 async fn get_timelocked_stakes(
167 &self,
168 owner: IotaAddress,
169 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170 let timer = self.metrics.get_stake_iota_latency.start_timer();
171 let stakes = self.get_timelocked_staked_iota(owner).await?;
172 if stakes.is_empty() {
173 return Ok(vec![]);
174 }
175 drop(timer);
176
177 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179 let self_clone = self.clone();
180 spawn_monitored_task!(
181 self_clone
182 .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183 )
184 .await?
185 }
186
187 async fn get_delegated_stakes(
188 &self,
189 stakes: Vec<(StakedIota, bool)>,
190 ) -> Result<Vec<DelegatedStake>, Error> {
191 let pools = stakes.into_iter().fold(
192 BTreeMap::<_, Vec<_>>::new(),
193 |mut pools, (stake, exists)| {
194 pools
195 .entry(stake.pool_id())
196 .or_default()
197 .push((stake, exists));
198 pools
199 },
200 );
201
202 let system_state = self.get_system_state()?;
203 let system_state_summary = IotaSystemStateSummaryV2::try_from(
204 system_state.clone().into_iota_system_state_summary(),
205 )?;
206
207 let rates = exchange_rates(&self.state, system_state_summary.epoch)
208 .await?
209 .into_iter()
210 .chain(candidate_validators_exchange_rate(&self.state)?.into_iter())
212 .chain(pending_validators_exchange_rate(&self.state)?.into_iter())
214 .map(|rates| (rates.pool_id, rates))
215 .collect::<BTreeMap<_, _>>();
216
217 let mut delegated_stakes = vec![];
218 for (pool_id, stakes) in pools {
219 let rate_table = rates.get(&pool_id).ok_or_else(|| {
221 IotaRpcInputError::GenericNotFound(format!(
222 "Cannot find rates for staking pool {pool_id}"
223 ))
224 })?;
225 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227 let mut delegations = vec![];
228 for (stake, exists) in stakes {
229 let status = stake_status(
230 system_state_summary.epoch,
231 stake.activation_epoch(),
232 stake.principal(),
233 exists,
234 current_rate,
235 rate_table,
236 );
237 delegations.push(Stake {
238 staked_iota_id: stake.id(),
239 stake_request_epoch: stake.activation_epoch() - 1,
241 stake_active_epoch: stake.activation_epoch(),
242 principal: stake.principal(),
243 status,
244 })
245 }
246 delegated_stakes.push(DelegatedStake {
247 validator_address: rate_table.address,
248 staking_pool: pool_id,
249 stakes: delegations,
250 })
251 }
252 Ok(delegated_stakes)
253 }
254
255 async fn get_delegated_timelocked_stakes(
256 &self,
257 stakes: Vec<(TimelockedStakedIota, bool)>,
258 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259 let pools = stakes.into_iter().fold(
260 BTreeMap::<_, Vec<_>>::new(),
261 |mut pools, (stake, exists)| {
262 pools
263 .entry(stake.pool_id())
264 .or_default()
265 .push((stake, exists));
266 pools
267 },
268 );
269
270 let system_state = self.get_system_state()?;
271 let system_state_summary = IotaSystemStateSummaryV2::try_from(
272 system_state.clone().into_iota_system_state_summary(),
273 )?;
274
275 let rates = exchange_rates(&self.state, system_state_summary.epoch)
276 .await?
277 .into_iter()
278 .chain(candidate_validators_exchange_rate(&self.state)?)
280 .chain(pending_validators_exchange_rate(&self.state)?)
282 .map(|rates| (rates.pool_id, rates))
283 .collect::<BTreeMap<_, _>>();
284
285 let mut delegated_stakes = vec![];
286 for (pool_id, stakes) in pools {
287 let rate_table = rates.get(&pool_id).ok_or_else(|| {
289 IotaRpcInputError::GenericNotFound(format!(
290 "Cannot find rates for staking pool {pool_id}"
291 ))
292 })?;
293 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295 let mut delegations = vec![];
296 for (stake, exists) in stakes {
297 let status = stake_status(
298 system_state_summary.epoch,
299 stake.activation_epoch(),
300 stake.principal(),
301 exists,
302 current_rate,
303 rate_table,
304 );
305 delegations.push(TimelockedStake {
306 timelocked_staked_iota_id: stake.id(),
307 stake_request_epoch: stake.activation_epoch() - 1,
309 stake_active_epoch: stake.activation_epoch(),
310 principal: stake.principal(),
311 status,
312 expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313 label: stake.label().clone(),
314 })
315 }
316 delegated_stakes.push(DelegatedTimelockedStake {
317 validator_address: rate_table.address,
318 staking_pool: pool_id,
319 stakes: delegations,
320 })
321 }
322 Ok(delegated_stakes)
323 }
324
325 async fn stakes_with_status(
326 &self,
327 iter: impl Iterator<Item = ObjectRead>,
328 ) -> Result<Vec<(Object, bool)>, Error> {
329 let mut stakes = vec![];
330
331 for stake in iter {
332 match stake {
333 ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334 ObjectRead::Deleted((object_id, version, _)) => {
335 let Some(o) = self
336 .state
337 .find_object_lt_or_eq_version(&object_id, &version.one_before().unwrap())
338 .await?
339 else {
340 Err(IotaRpcInputError::UserInput(
341 UserInputError::ObjectNotFound {
342 object_id,
343 version: None,
344 },
345 ))?
346 };
347 stakes.push((o, false));
348 }
349 ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
350 UserInputError::ObjectNotFound {
351 object_id: id,
352 version: None,
353 },
354 ))?,
355 }
356 }
357
358 Ok(stakes)
359 }
360
361 fn get_system_state(&self) -> Result<IotaSystemState, Error> {
362 Ok(self.state.get_system_state()?)
363 }
364}
365
366#[async_trait]
367impl GovernanceReadApiServer for GovernanceReadApi {
368 #[instrument(skip(self))]
369 async fn get_stakes_by_ids(
370 &self,
371 staked_iota_ids: Vec<ObjectID>,
372 ) -> RpcResult<Vec<DelegatedStake>> {
373 self.get_stakes_by_ids(staked_iota_ids).trace().await
374 }
375
376 #[instrument(skip(self))]
377 async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
378 self.get_stakes(owner).trace().await
379 }
380
381 #[instrument(skip(self))]
382 async fn get_timelocked_stakes_by_ids(
383 &self,
384 timelocked_staked_iota_ids: Vec<ObjectID>,
385 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
386 self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
387 .trace()
388 .await
389 }
390
391 #[instrument(skip(self))]
392 async fn get_timelocked_stakes(
393 &self,
394 owner: IotaAddress,
395 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
396 self.get_timelocked_stakes(owner).trace().await
397 }
398
399 #[instrument(skip(self))]
400 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
401 async move {
402 self.state
403 .get_or_latest_committee(epoch)
404 .map(|committee| committee.into())
405 .map_err(Error::from)
406 }
407 .trace()
408 .await
409 }
410
411 #[instrument(skip(self))]
412 async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummary> {
413 async move {
414 Ok(self
415 .state
416 .get_system_state()?
417 .into_iota_system_state_summary())
418 }
419 .trace()
420 .await
421 }
422
423 #[instrument(skip(self))]
424 async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1> {
425 async move {
426 Ok(self
427 .state
428 .get_system_state()?
429 .into_iota_system_state_summary()
430 .try_into()?)
431 }
432 .trace()
433 .await
434 }
435
436 #[instrument(skip(self))]
437 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
438 async move {
439 let epoch_store = self.state.load_epoch_store_one_call_per_task();
440 Ok(epoch_store.reference_gas_price().into())
441 }
442 .trace()
443 .await
444 }
445
446 #[instrument(skip(self))]
447 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
448 info!("get_validator_apy");
449 let system_state_summary = self.get_latest_iota_system_state().await?;
450
451 let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
452 .await
453 .map_err(|e| error_object_from_rpc(e.into()))?;
454
455 let apys = calculate_apys(exchange_rate_table);
456
457 Ok(ValidatorApys {
458 apys,
459 epoch: system_state_summary.epoch,
460 })
461 }
462}
463
464pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
465 let mut apys = vec![];
466
467 for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
468 let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
469
470 let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
471 apys.push(ValidatorApy {
472 address: rates.address,
473 apy: mean_apy,
474 });
475 }
476 apys
477}
478
479pub fn mean_apy_from_exchange_rates<'er>(
487 exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
488) -> f64 {
489 const MAX_VALID_APY: f64 = 1.00;
491 const SAMPLES: usize = 7;
492
493 let rates = exchange_rates.clone().dropping(1);
494 let rates_next = exchange_rates.dropping_back(1);
495
496 let mut apys = rates
497 .zip(rates_next)
498 .take(SAMPLES + 1)
499 .map(|(er, er_next)| calculate_apy(er, er_next))
500 .collect::<Vec<_>>();
501
502 if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
504 return 0.0;
505 }
506 let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
510 || apys.iter().any(|&apy| apy > MAX_VALID_APY);
511
512 apys.truncate(SAMPLES);
513
514 if has_outlier {
515 Data::new(apys).median()
516 } else {
517 let sum: f64 = apys.iter().sum();
518 sum / SAMPLES as f64
519 }
520}
521
522fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
527 ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
528}
529
530fn stake_status(
531 epoch: u64,
532 activation_epoch: u64,
533 principal: u64,
534 exists: bool,
535 current_rate: Option<&PoolTokenExchangeRate>,
536 rate_table: &ValidatorExchangeRates,
537) -> StakeStatus {
538 if !exists {
539 StakeStatus::Unstaked
540 } else if epoch >= activation_epoch {
541 let estimated_reward = if let Some(current_rate) = current_rate {
543 let stake_rate = rate_table
544 .rates
545 .iter()
546 .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
547 .unwrap_or_default();
548 let estimated_reward =
549 ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
550 max(0, estimated_reward.round() as u64)
551 } else {
552 0
553 };
554 StakeStatus::Active { estimated_reward }
555 } else {
556 StakeStatus::Pending
557 }
558}
559
560#[cached(
564 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
565 create = "{ SizedCache::with_size(1) }",
566 convert = "{ _current_epoch }",
567 result = true
568)]
569async fn exchange_rates(
570 state: &Arc<dyn StateRead>,
571 _current_epoch: EpochId,
572) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
573 Ok(active_validators_exchange_rates(state)?
574 .into_iter()
575 .chain(inactive_validators_exchange_rates(state)?.into_iter())
576 .collect())
577}
578
579#[cfg(msim)]
587pub async fn clear_exchange_rates_cache_for_testing() {
588 use cached::Cached;
589 if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
590 let mut cache = mutex.lock().await;
591 cache.cache_clear();
592 }
593}
594
595fn validator_exchange_rates(
597 state: &Arc<dyn StateRead>,
598 tables: Vec<ValidatorTable>,
599) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
600 if tables.is_empty() {
601 return Ok(vec![]);
602 };
603
604 let mut exchange_rates = vec![];
605 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
607 let mut rates = state
608 .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
609 .into_iter()
610 .map(|(_object_id, df)| {
611 let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
612 IotaError::ObjectDeserialization {
613 error: e.to_string(),
614 }
615 })?;
616
617 let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
618 &state.get_object_store().as_ref(),
619 exchange_rates_id,
620 &epoch,
621 )?;
622
623 Ok::<_, IotaError>((epoch, exchange_rate))
624 })
625 .collect::<Result<Vec<_>, _>>()?;
626
627 rates = backfill_rates(rates);
630
631 exchange_rates.push(ValidatorExchangeRates {
632 address,
633 pool_id,
634 active,
635 rates,
636 });
637 }
638
639 Ok(exchange_rates)
640}
641
642fn active_validators_exchange_rates(
644 state: &Arc<dyn StateRead>,
645) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
646 let system_state_summary = IotaSystemStateSummaryV2::try_from(
647 state.get_system_state()?.into_iota_system_state_summary(),
648 )?;
649
650 let tables = system_state_summary
651 .active_validators
652 .into_iter()
653 .map(|validator| {
654 (
655 validator.iota_address,
656 validator.staking_pool_id,
657 validator.exchange_rates_id,
658 validator.exchange_rates_size,
659 true,
660 )
661 })
662 .collect();
663
664 validator_exchange_rates(state, tables)
665}
666
667fn inactive_validators_exchange_rates(
669 state: &Arc<dyn StateRead>,
670) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
671 let system_state_summary = IotaSystemStateSummaryV2::try_from(
672 state.get_system_state()?.into_iota_system_state_summary(),
673 )?;
674
675 let tables = validator_summary_from_system_state(
676 state,
677 system_state_summary.inactive_pools_id,
678 system_state_summary.inactive_pools_size,
679 |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
680 )?;
681
682 validator_exchange_rates(state, tables)
683}
684
685fn pending_validators_exchange_rate(
691 state: &Arc<dyn StateRead>,
692) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
693 let system_state = state.get_system_state()?;
694 let object_store = state.get_object_store();
695
696 let tables = system_state
698 .get_pending_active_validators(object_store)?
699 .into_iter()
700 .map(|pending_active_validator| {
701 (
702 pending_active_validator.iota_address,
703 pending_active_validator.staking_pool_id,
704 pending_active_validator.exchange_rates_id,
705 pending_active_validator.exchange_rates_size,
706 false,
707 )
708 })
709 .collect::<Vec<ValidatorTable>>();
710
711 validator_exchange_rates(state, tables)
712}
713
714fn candidate_validators_exchange_rate(
720 state: &Arc<dyn StateRead>,
721) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
722 let system_state_summary = IotaSystemStateSummaryV2::try_from(
723 state.get_system_state()?.into_iota_system_state_summary(),
724 )?;
725
726 let tables = validator_summary_from_system_state(
729 state,
730 system_state_summary.validator_candidates_id,
731 system_state_summary.validator_candidates_size,
732 |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
733 )?;
734
735 validator_exchange_rates(state, tables)
736}
737
738fn validator_summary_from_system_state<K, F>(
785 state: &Arc<dyn StateRead>,
786 table_id: ObjectID,
787 limit: u64,
788 key: F,
789) -> RpcInterimResult<Vec<ValidatorTable>>
790where
791 F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
792 K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
793{
794 let object_store = state.get_object_store();
795
796 state
797 .get_dynamic_fields(table_id, None, limit as usize)?
798 .into_iter()
799 .map(|(_object_id, df)| {
800 let validator_summary = get_validator_from_table(object_store, table_id, &key(df)?)?;
801
802 Ok((
803 validator_summary.iota_address,
804 validator_summary.staking_pool_id,
805 validator_summary.exchange_rates_id,
806 validator_summary.exchange_rates_size,
807 false,
808 ))
809 })
810 .collect()
811}
812
813#[derive(Clone, Debug)]
814pub struct ValidatorExchangeRates {
815 pub address: IotaAddress,
816 pub pool_id: ObjectID,
817 pub active: bool,
818 pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
819}
820
821fn backfill_rates(
825 mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
826) -> Vec<(EpochId, PoolTokenExchangeRate)> {
827 if rates.is_empty() {
828 return rates;
829 }
830 rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
832
833 let (min_epoch, _) = rates.first().expect("rates should not be empty");
835 let (max_epoch, _) = rates.last().expect("rates should not be empty");
836 let expected_len = (max_epoch - min_epoch + 1) as usize;
837 let current_len = rates.len();
838
839 if current_len == expected_len {
841 rates.reverse();
842 return rates;
843 }
844
845 let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
846 let mut missing_rates = Vec::with_capacity(expected_len - current_len);
847 for (epoch_id, rate) in rates {
848 if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
850 for missing_epoch_id in prev_epoch_id + 1..epoch_id {
851 missing_rates.push((missing_epoch_id, prev_rate.clone()));
852 }
853 };
854
855 filled_rates.append(&mut missing_rates);
860 filled_rates.push((epoch_id, rate));
861 }
862 filled_rates.reverse();
863 filled_rates
864}
865
866impl IotaRpcModule for GovernanceReadApi {
867 fn rpc(self) -> RpcModule<Self> {
868 self.into_rpc()
869 }
870
871 fn rpc_doc_module() -> Module {
872 GovernanceReadApiOpenRpc::module_doc()
873 }
874}
875
876#[cfg(test)]
877mod tests {
878 use iota_types::iota_system_state::PoolTokenExchangeRate;
879
880 use super::*;
881
882 #[test]
883 fn calculate_apys_with_outliers() {
884 let file =
885 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
886 .unwrap();
887 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
888 serde_json::from_reader(file).unwrap();
889
890 let mut address_map = BTreeMap::new();
891
892 let exchange_rates = rates
893 .into_iter()
894 .map(|(validator, rates_vec)| {
895 let address = IotaAddress::random_for_testing_only();
896 address_map.insert(address, validator);
897 ValidatorExchangeRates {
898 address,
899 pool_id: ObjectID::random(),
900 active: true,
901 rates: backfill_rates(rates_vec),
902 }
903 })
904 .collect();
905
906 let apys = calculate_apys(exchange_rates);
907
908 for apy in &apys {
909 println!("{}: {}", address_map[&apy.address], apy.apy);
910 assert!(apy.apy < 0.15)
911 }
912 }
913
914 #[test]
915 fn calculate_apys_without_outliers() {
916 let file =
917 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
918 .unwrap();
919 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
920 serde_json::from_reader(file).unwrap();
921
922 let mut address_map = BTreeMap::new();
923
924 let exchange_rates = rates
925 .into_iter()
926 .map(|(validator, rates_vec)| {
927 let address = IotaAddress::random_for_testing_only();
928 address_map.insert(address, validator);
929 ValidatorExchangeRates {
930 address,
931 pool_id: ObjectID::random(),
932 active: true,
933 rates: backfill_rates(rates_vec),
934 }
935 })
936 .collect();
937
938 let apys = calculate_apys(exchange_rates);
939
940 for apy in &apys {
941 println!("{}: {}", address_map[&apy.address], apy.apy);
942 assert!(apy.apy < 0.15)
943 }
944 }
945
946 #[test]
947 fn test_backfill_rates_empty() {
948 let rates = vec![];
949 assert_eq!(backfill_rates(rates), vec![]);
950 }
951
952 #[test]
953 fn test_backfill_rates_no_gaps() {
954 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
955 let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
956 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
957 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
958
959 let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
960 assert_eq!(backfill_rates(rates), expected);
961 }
962
963 #[test]
964 fn test_backfill_single_rate() {
965 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
966 let rates = vec![(1, rate1.clone())];
967 let expected = vec![(1, rate1)];
968 assert_eq!(backfill_rates(rates), expected);
969 }
970
971 #[test]
972 fn test_backfill_rates_with_gaps() {
973 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
974 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
975 let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
976 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
977
978 let expected = vec![
979 (5, rate5),
980 (4, rate3.clone()),
981 (3, rate3),
982 (2, rate1.clone()),
983 (1, rate1),
984 ];
985 assert_eq!(backfill_rates(rates), expected);
986 }
987
988 #[test]
989 fn test_backfill_rates_missing_middle_epoch() {
990 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
991 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
992 let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
993 let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
994 assert_eq!(backfill_rates(rates), expected);
995 }
996
997 #[test]
998 fn test_backfill_rates_missing_middle_epochs() {
999 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1000 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1001 let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1002 let expected = vec![
1003 (4, rate4),
1004 (3, rate1.clone()),
1005 (2, rate1.clone()),
1006 (1, rate1),
1007 ];
1008 assert_eq!(backfill_rates(rates), expected);
1009 }
1010
1011 #[test]
1012 fn test_backfill_rates_unordered_input() {
1013 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1014 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1015 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1016 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1017 let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1018 assert_eq!(backfill_rates(rates), expected);
1019 }
1020}