1use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11 GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14 DelegatedStake, DelegatedTimelockedStake, IotaCommittee,
15 IotaSystemStateSummary as IotaSystemStateSummarySchema,
16 IotaSystemStateSummaryV1 as IotaSystemStateSummaryV1Schema, Stake, StakeStatus,
17 TimelockedStake, ValidatorApy, ValidatorApys,
18};
19use iota_metrics::spawn_monitored_task;
20use iota_open_rpc::Module;
21use iota_types::{
22 MoveTypeTagTrait,
23 base_types::{IotaAddress, ObjectID},
24 committee::EpochId,
25 dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
26 error::{IotaError, UserInputError},
27 governance::StakedIota,
28 id::ID,
29 iota_serde::BigInt,
30 iota_system_state::{
31 IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
32 iota_system_state_summary::{IotaSystemStateSummaryV1, IotaSystemStateSummaryV2},
33 },
34 object::{Object, ObjectRead},
35 timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44 IotaRpcModule, ObjectProvider,
45 authority_state::StateRead,
46 error::{Error, IotaRpcInputError, RpcInterimResult},
47 logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54 state: Arc<dyn StateRead>,
55 pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59 pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60 Self { state, metrics }
61 }
62
63 async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64 let state = self.state.clone();
65 let result =
66 spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68 self.metrics
69 .get_stake_iota_result_size
70 .observe(result.len() as f64);
71 self.metrics
72 .get_stake_iota_result_size_total
73 .inc_by(result.len() as u64);
74 Ok(result)
75 }
76
77 async fn get_timelocked_staked_iota(
78 &self,
79 owner: IotaAddress,
80 ) -> Result<Vec<TimelockedStakedIota>, Error> {
81 let state = self.state.clone();
82 let result =
83 spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84 .await??;
85
86 self.metrics
87 .get_stake_iota_result_size
88 .observe(result.len() as f64);
89 self.metrics
90 .get_stake_iota_result_size_total
91 .inc_by(result.len() as u64);
92 Ok(result)
93 }
94
95 async fn get_stakes_by_ids(
96 &self,
97 staked_iota_ids: Vec<ObjectID>,
98 ) -> Result<Vec<DelegatedStake>, Error> {
99 let state = self.state.clone();
100 let stakes_read = spawn_monitored_task!(async move {
101 staked_iota_ids
102 .iter()
103 .map(|id| state.get_object_read(id))
104 .collect::<Result<Vec<_>, _>>()
105 })
106 .await??;
107
108 if stakes_read.is_empty() {
109 return Ok(vec![]);
110 }
111
112 let stakes: Vec<(StakedIota, bool)> = self
113 .stakes_with_status(stakes_read.into_iter())
114 .await?
115 .into_iter()
116 .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117 .collect::<Result<_, _>>()?;
118
119 self.get_delegated_stakes(stakes).await
120 }
121
122 async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123 let timer = self.metrics.get_stake_iota_latency.start_timer();
124 let stakes = self.get_staked_iota(owner).await?;
125 if stakes.is_empty() {
126 return Ok(vec![]);
127 }
128 drop(timer);
129
130 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132 let self_clone = self.clone();
133 spawn_monitored_task!(
134 self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135 )
136 .await?
137 }
138
139 async fn get_timelocked_stakes_by_ids(
140 &self,
141 timelocked_staked_iota_ids: Vec<ObjectID>,
142 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143 let state = self.state.clone();
144 let stakes_read = spawn_monitored_task!(async move {
145 timelocked_staked_iota_ids
146 .iter()
147 .map(|id| state.get_object_read(id))
148 .collect::<Result<Vec<_>, _>>()
149 })
150 .await??;
151
152 if stakes_read.is_empty() {
153 return Ok(vec![]);
154 }
155
156 let stakes: Vec<(TimelockedStakedIota, bool)> = self
157 .stakes_with_status(stakes_read.into_iter())
158 .await?
159 .into_iter()
160 .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161 .collect::<Result<_, _>>()?;
162
163 self.get_delegated_timelocked_stakes(stakes).await
164 }
165
166 async fn get_timelocked_stakes(
167 &self,
168 owner: IotaAddress,
169 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170 let timer = self.metrics.get_stake_iota_latency.start_timer();
171 let stakes = self.get_timelocked_staked_iota(owner).await?;
172 if stakes.is_empty() {
173 return Ok(vec![]);
174 }
175 drop(timer);
176
177 let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179 let self_clone = self.clone();
180 spawn_monitored_task!(
181 self_clone
182 .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183 )
184 .await?
185 }
186
187 async fn get_delegated_stakes(
188 &self,
189 stakes: Vec<(StakedIota, bool)>,
190 ) -> Result<Vec<DelegatedStake>, Error> {
191 let pools = stakes.into_iter().fold(
192 BTreeMap::<_, Vec<_>>::new(),
193 |mut pools, (stake, exists)| {
194 pools
195 .entry(stake.pool_id())
196 .or_default()
197 .push((stake, exists));
198 pools
199 },
200 );
201
202 let system_state = self.get_system_state()?;
203 let system_state_summary = IotaSystemStateSummaryV2::try_from(
204 system_state.clone().into_iota_system_state_summary(),
205 )?;
206
207 let rates = exchange_rates(&self.state, system_state_summary.epoch)
208 .await?
209 .into_iter()
210 .chain(candidate_validators_exchange_rate(&self.state)?)
212 .chain(pending_validators_exchange_rate(&self.state)?)
214 .map(|rates| (rates.pool_id, rates))
215 .collect::<BTreeMap<_, _>>();
216
217 let mut delegated_stakes = vec![];
218 for (pool_id, stakes) in pools {
219 let rate_table = rates.get(&pool_id).ok_or_else(|| {
221 IotaRpcInputError::GenericNotFound(format!(
222 "Cannot find rates for staking pool {pool_id}"
223 ))
224 })?;
225 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227 let mut delegations = vec![];
228 for (stake, exists) in stakes {
229 let status = stake_status(
230 system_state_summary.epoch,
231 stake.activation_epoch(),
232 stake.principal(),
233 exists,
234 current_rate,
235 rate_table,
236 );
237 delegations.push(Stake {
238 staked_iota_id: stake.id(),
239 stake_request_epoch: stake.activation_epoch() - 1,
241 stake_active_epoch: stake.activation_epoch(),
242 principal: stake.principal(),
243 status,
244 })
245 }
246 delegated_stakes.push(DelegatedStake {
247 validator_address: rate_table.address,
248 staking_pool: pool_id,
249 stakes: delegations,
250 })
251 }
252 Ok(delegated_stakes)
253 }
254
255 async fn get_delegated_timelocked_stakes(
256 &self,
257 stakes: Vec<(TimelockedStakedIota, bool)>,
258 ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259 let pools = stakes.into_iter().fold(
260 BTreeMap::<_, Vec<_>>::new(),
261 |mut pools, (stake, exists)| {
262 pools
263 .entry(stake.pool_id())
264 .or_default()
265 .push((stake, exists));
266 pools
267 },
268 );
269
270 let system_state = self.get_system_state()?;
271 let system_state_summary = IotaSystemStateSummaryV2::try_from(
272 system_state.clone().into_iota_system_state_summary(),
273 )?;
274
275 let rates = exchange_rates(&self.state, system_state_summary.epoch)
276 .await?
277 .into_iter()
278 .chain(candidate_validators_exchange_rate(&self.state)?)
280 .chain(pending_validators_exchange_rate(&self.state)?)
282 .map(|rates| (rates.pool_id, rates))
283 .collect::<BTreeMap<_, _>>();
284
285 let mut delegated_stakes = vec![];
286 for (pool_id, stakes) in pools {
287 let rate_table = rates.get(&pool_id).ok_or_else(|| {
289 IotaRpcInputError::GenericNotFound(format!(
290 "Cannot find rates for staking pool {pool_id}"
291 ))
292 })?;
293 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295 let mut delegations = vec![];
296 for (stake, exists) in stakes {
297 let status = stake_status(
298 system_state_summary.epoch,
299 stake.activation_epoch(),
300 stake.principal(),
301 exists,
302 current_rate,
303 rate_table,
304 );
305 delegations.push(TimelockedStake {
306 timelocked_staked_iota_id: stake.id(),
307 stake_request_epoch: stake.activation_epoch() - 1,
309 stake_active_epoch: stake.activation_epoch(),
310 principal: stake.principal(),
311 status,
312 expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313 label: stake.label().clone(),
314 })
315 }
316 delegated_stakes.push(DelegatedTimelockedStake {
317 validator_address: rate_table.address,
318 staking_pool: pool_id,
319 stakes: delegations,
320 })
321 }
322 Ok(delegated_stakes)
323 }
324
325 async fn stakes_with_status(
326 &self,
327 iter: impl Iterator<Item = ObjectRead>,
328 ) -> Result<Vec<(Object, bool)>, Error> {
329 let mut stakes = vec![];
330
331 for stake in iter {
332 match stake {
333 ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334 ObjectRead::Deleted(object_ref) => {
335 let Some(o) = self
336 .state
337 .find_object_lt_or_eq_version(
338 &object_ref.object_id,
339 &object_ref.version.previous().unwrap(),
340 )
341 .await?
342 else {
343 Err(IotaRpcInputError::UserInput(
344 UserInputError::ObjectNotFound {
345 object_id: object_ref.object_id,
346 version: None,
347 },
348 ))?
349 };
350 stakes.push((o, false));
351 }
352 ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
353 UserInputError::ObjectNotFound {
354 object_id: id,
355 version: None,
356 },
357 ))?,
358 }
359 }
360
361 Ok(stakes)
362 }
363
364 fn get_system_state(&self) -> Result<IotaSystemState, Error> {
365 Ok(self.state.get_system_state()?)
366 }
367}
368
369#[async_trait]
370impl GovernanceReadApiServer for GovernanceReadApi {
371 #[instrument(skip(self, staked_iota_ids), fields(staked_iota_ids = staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
372 async fn get_stakes_by_ids(
373 &self,
374 staked_iota_ids: Vec<ObjectID>,
375 ) -> RpcResult<Vec<DelegatedStake>> {
376 self.get_stakes_by_ids(staked_iota_ids).trace().await
377 }
378
379 #[instrument(skip(self, owner), fields(owner = %owner))]
380 async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
381 self.get_stakes(owner).trace().await
382 }
383
384 #[instrument(skip(self, timelocked_staked_iota_ids), fields(timelocked_staked_iota_ids = timelocked_staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
385 async fn get_timelocked_stakes_by_ids(
386 &self,
387 timelocked_staked_iota_ids: Vec<ObjectID>,
388 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
389 self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
390 .trace()
391 .await
392 }
393
394 #[instrument(skip(self, owner), fields(owner = %owner))]
395 async fn get_timelocked_stakes(
396 &self,
397 owner: IotaAddress,
398 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
399 self.get_timelocked_stakes(owner).trace().await
400 }
401
402 #[instrument(skip(self))]
403 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
404 async move {
405 self.state
406 .get_or_latest_committee(epoch)
407 .map(|committee| committee.into())
408 .map_err(Error::from)
409 }
410 .trace()
411 .await
412 }
413
414 #[instrument(skip(self))]
415 async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummarySchema> {
416 async move {
417 Ok(self
418 .state
419 .get_system_state()?
420 .into_iota_system_state_summary()
421 .into())
422 }
423 .trace()
424 .await
425 }
426
427 #[instrument(skip(self))]
428 async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1Schema> {
429 async move {
430 Ok(IotaSystemStateSummaryV1::try_from(
431 self.state
432 .get_system_state()?
433 .into_iota_system_state_summary(),
434 )?
435 .into())
436 }
437 .trace()
438 .await
439 }
440
441 #[instrument(skip(self))]
442 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
443 async move {
444 let epoch_store = self.state.load_epoch_store_one_call_per_task();
445 Ok(epoch_store.reference_gas_price().into())
446 }
447 .trace()
448 .await
449 }
450
451 #[instrument(skip(self))]
452 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
453 info!("get_validator_apy");
454 let system_state_summary = self.get_latest_iota_system_state().await?;
455
456 let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
457 .await
458 .map_err(|e| error_object_from_rpc(e.into()))?;
459
460 let apys = calculate_apys(exchange_rate_table);
461
462 Ok(ValidatorApys {
463 apys,
464 epoch: system_state_summary.epoch,
465 })
466 }
467}
468
469pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
470 let mut apys = vec![];
471
472 for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
473 let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
474
475 let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
476 apys.push(ValidatorApy {
477 address: rates.address,
478 apy: mean_apy,
479 });
480 }
481 apys
482}
483
484pub fn mean_apy_from_exchange_rates<'er>(
492 exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
493) -> f64 {
494 const MAX_VALID_APY: f64 = 1.00;
496 const SAMPLES: usize = 7;
497
498 let rates = exchange_rates.clone().dropping(1);
499 let rates_next = exchange_rates.dropping_back(1);
500
501 let mut apys = rates
502 .zip(rates_next)
503 .take(SAMPLES + 1)
504 .map(|(er, er_next)| calculate_apy(er, er_next))
505 .collect::<Vec<_>>();
506
507 if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
509 return 0.0;
510 }
511 let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
515 || apys.iter().any(|&apy| apy > MAX_VALID_APY);
516
517 apys.truncate(SAMPLES);
518
519 if has_outlier {
520 Data::new(apys).median()
521 } else {
522 let sum: f64 = apys.iter().sum();
523 sum / SAMPLES as f64
524 }
525}
526
527fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
532 ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
533}
534
535fn stake_status(
536 epoch: u64,
537 activation_epoch: u64,
538 principal: u64,
539 exists: bool,
540 current_rate: Option<&PoolTokenExchangeRate>,
541 rate_table: &ValidatorExchangeRates,
542) -> StakeStatus {
543 if !exists {
544 StakeStatus::Unstaked
545 } else if epoch >= activation_epoch {
546 let estimated_reward = if let Some(current_rate) = current_rate {
548 let stake_rate = rate_table
549 .rates
550 .iter()
551 .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
552 .unwrap_or_default();
553 let estimated_reward =
554 ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
555 max(0, estimated_reward.round() as u64)
556 } else {
557 0
558 };
559 StakeStatus::Active { estimated_reward }
560 } else {
561 StakeStatus::Pending
562 }
563}
564
565#[cached(
569 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
570 create = "{ SizedCache::with_size(1) }",
571 convert = "{ _current_epoch }",
572 result = true
573)]
574async fn exchange_rates(
575 state: &Arc<dyn StateRead>,
576 _current_epoch: EpochId,
577) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
578 Ok(active_validators_exchange_rates(state)?
579 .into_iter()
580 .chain(inactive_validators_exchange_rates(state)?.into_iter())
581 .collect())
582}
583
584#[cfg(msim)]
592pub async fn clear_exchange_rates_cache_for_testing() {
593 use cached::Cached;
594 if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
595 let mut cache = mutex.lock().await;
596 cache.cache_clear();
597 }
598}
599
600fn validator_exchange_rates(
602 state: &Arc<dyn StateRead>,
603 tables: Vec<ValidatorTable>,
604) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
605 if tables.is_empty() {
606 return Ok(vec![]);
607 };
608
609 let mut exchange_rates = vec![];
610 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
612 let mut rates = state
613 .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
614 .into_iter()
615 .map(|(_object_id, df)| {
616 let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
617 IotaError::ObjectDeserialization {
618 error: e.to_string(),
619 }
620 })?;
621
622 let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
623 &state.get_object_store().as_ref(),
624 exchange_rates_id,
625 &epoch,
626 )?;
627
628 Ok::<_, IotaError>((epoch, exchange_rate))
629 })
630 .collect::<Result<Vec<_>, _>>()?;
631
632 rates = backfill_rates(rates);
635
636 exchange_rates.push(ValidatorExchangeRates {
637 address,
638 pool_id,
639 active,
640 rates,
641 });
642 }
643
644 Ok(exchange_rates)
645}
646
647fn active_validators_exchange_rates(
649 state: &Arc<dyn StateRead>,
650) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
651 let system_state_summary = IotaSystemStateSummaryV2::try_from(
652 state.get_system_state()?.into_iota_system_state_summary(),
653 )?;
654
655 let tables = system_state_summary
656 .active_validators
657 .into_iter()
658 .map(|validator| {
659 (
660 validator.iota_address,
661 validator.staking_pool_id,
662 validator.exchange_rates_id,
663 validator.exchange_rates_size,
664 true,
665 )
666 })
667 .collect();
668
669 validator_exchange_rates(state, tables)
670}
671
672fn inactive_validators_exchange_rates(
674 state: &Arc<dyn StateRead>,
675) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
676 let system_state_summary = IotaSystemStateSummaryV2::try_from(
677 state.get_system_state()?.into_iota_system_state_summary(),
678 )?;
679
680 let tables = validator_summary_from_system_state(
681 state,
682 system_state_summary.inactive_pools_id,
683 system_state_summary.inactive_pools_size,
684 |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
685 Some(system_state_summary.protocol_version),
686 )?;
687
688 validator_exchange_rates(state, tables)
689}
690
691fn pending_validators_exchange_rate(
697 state: &Arc<dyn StateRead>,
698) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
699 let system_state = state.get_system_state()?;
700 let object_store = state.get_object_store();
701
702 let tables = system_state
704 .get_pending_active_validators(object_store)?
705 .into_iter()
706 .map(|pending_active_validator| {
707 (
708 pending_active_validator.iota_address,
709 pending_active_validator.staking_pool_id,
710 pending_active_validator.exchange_rates_id,
711 pending_active_validator.exchange_rates_size,
712 false,
713 )
714 })
715 .collect::<Vec<ValidatorTable>>();
716
717 validator_exchange_rates(state, tables)
718}
719
720fn candidate_validators_exchange_rate(
726 state: &Arc<dyn StateRead>,
727) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
728 let system_state_summary = IotaSystemStateSummaryV2::try_from(
729 state.get_system_state()?.into_iota_system_state_summary(),
730 )?;
731
732 let tables = validator_summary_from_system_state(
735 state,
736 system_state_summary.validator_candidates_id,
737 system_state_summary.validator_candidates_size,
738 |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
739 Some(system_state_summary.protocol_version),
740 )?;
741
742 validator_exchange_rates(state, tables)
743}
744
745fn validator_summary_from_system_state<K, F>(
792 state: &Arc<dyn StateRead>,
793 table_id: ObjectID,
794 limit: u64,
795 key: F,
796 protocol_version: Option<u64>,
797) -> RpcInterimResult<Vec<ValidatorTable>>
798where
799 F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
800 K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
801{
802 let object_store = state.get_object_store();
803
804 state
805 .get_dynamic_fields(table_id, None, limit as usize)?
806 .into_iter()
807 .map(|(_object_id, df)| {
808 let validator_summary =
809 get_validator_from_table(object_store, table_id, &key(df)?, protocol_version)?;
810
811 Ok((
812 validator_summary.iota_address,
813 validator_summary.staking_pool_id,
814 validator_summary.exchange_rates_id,
815 validator_summary.exchange_rates_size,
816 false,
817 ))
818 })
819 .collect()
820}
821
822#[derive(Clone, Debug)]
823pub struct ValidatorExchangeRates {
824 pub address: IotaAddress,
825 pub pool_id: ObjectID,
826 pub active: bool,
827 pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
828}
829
830fn backfill_rates(
834 mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
835) -> Vec<(EpochId, PoolTokenExchangeRate)> {
836 if rates.is_empty() {
837 return rates;
838 }
839 rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
841
842 let (min_epoch, _) = rates.first().expect("rates should not be empty");
844 let (max_epoch, _) = rates.last().expect("rates should not be empty");
845 let expected_len = (max_epoch - min_epoch + 1) as usize;
846 let current_len = rates.len();
847
848 if current_len == expected_len {
850 rates.reverse();
851 return rates;
852 }
853
854 let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
855 let mut missing_rates = Vec::with_capacity(expected_len - current_len);
856 for (epoch_id, rate) in rates {
857 if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
859 for missing_epoch_id in prev_epoch_id + 1..epoch_id {
860 missing_rates.push((missing_epoch_id, prev_rate.clone()));
861 }
862 };
863
864 filled_rates.append(&mut missing_rates);
869 filled_rates.push((epoch_id, rate));
870 }
871 filled_rates.reverse();
872 filled_rates
873}
874
875impl IotaRpcModule for GovernanceReadApi {
876 fn rpc(self) -> RpcModule<Self> {
877 self.into_rpc()
878 }
879
880 fn rpc_doc_module() -> Module {
881 GovernanceReadApiOpenRpc::module_doc()
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use iota_types::iota_system_state::PoolTokenExchangeRate;
888
889 use super::*;
890
891 #[test]
892 fn calculate_apys_with_outliers() {
893 let file =
894 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
895 .unwrap();
896 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
897 serde_json::from_reader(file).unwrap();
898
899 let mut address_map = BTreeMap::new();
900
901 let exchange_rates = rates
902 .into_iter()
903 .map(|(validator, rates_vec)| {
904 let address = IotaAddress::random();
905 address_map.insert(address, validator);
906 ValidatorExchangeRates {
907 address,
908 pool_id: ObjectID::random(),
909 active: true,
910 rates: backfill_rates(rates_vec),
911 }
912 })
913 .collect();
914
915 let apys = calculate_apys(exchange_rates);
916
917 for apy in &apys {
918 println!("{}: {}", address_map[&apy.address], apy.apy);
919 assert!(apy.apy < 0.15)
920 }
921 }
922
923 #[test]
924 fn calculate_apys_without_outliers() {
925 let file =
926 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
927 .unwrap();
928 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
929 serde_json::from_reader(file).unwrap();
930
931 let mut address_map = BTreeMap::new();
932
933 let exchange_rates = rates
934 .into_iter()
935 .map(|(validator, rates_vec)| {
936 let address = IotaAddress::random();
937 address_map.insert(address, validator);
938 ValidatorExchangeRates {
939 address,
940 pool_id: ObjectID::random(),
941 active: true,
942 rates: backfill_rates(rates_vec),
943 }
944 })
945 .collect();
946
947 let apys = calculate_apys(exchange_rates);
948
949 for apy in &apys {
950 println!("{}: {}", address_map[&apy.address], apy.apy);
951 assert!(apy.apy < 0.15)
952 }
953 }
954
955 #[test]
956 fn test_backfill_rates_empty() {
957 let rates = vec![];
958 assert_eq!(backfill_rates(rates), vec![]);
959 }
960
961 #[test]
962 fn test_backfill_rates_no_gaps() {
963 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
964 let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
965 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
966 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
967
968 let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
969 assert_eq!(backfill_rates(rates), expected);
970 }
971
972 #[test]
973 fn test_backfill_single_rate() {
974 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
975 let rates = vec![(1, rate1.clone())];
976 let expected = vec![(1, rate1)];
977 assert_eq!(backfill_rates(rates), expected);
978 }
979
980 #[test]
981 fn test_backfill_rates_with_gaps() {
982 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
983 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
984 let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
985 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
986
987 let expected = vec![
988 (5, rate5),
989 (4, rate3.clone()),
990 (3, rate3),
991 (2, rate1.clone()),
992 (1, rate1),
993 ];
994 assert_eq!(backfill_rates(rates), expected);
995 }
996
997 #[test]
998 fn test_backfill_rates_missing_middle_epoch() {
999 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1000 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1001 let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
1002 let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
1003 assert_eq!(backfill_rates(rates), expected);
1004 }
1005
1006 #[test]
1007 fn test_backfill_rates_missing_middle_epochs() {
1008 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1009 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1010 let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1011 let expected = vec![
1012 (4, rate4),
1013 (3, rate1.clone()),
1014 (2, rate1.clone()),
1015 (1, rate1),
1016 ];
1017 assert_eq!(backfill_rates(rates), expected);
1018 }
1019
1020 #[test]
1021 fn test_backfill_rates_unordered_input() {
1022 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1023 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1024 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1025 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1026 let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1027 assert_eq!(backfill_rates(rates), expected);
1028 }
1029}