iota_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11    GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14    DelegatedStake, DelegatedTimelockedStake, IotaCommittee, Stake, StakeStatus, TimelockedStake,
15    ValidatorApy, ValidatorApys,
16};
17use iota_metrics::spawn_monitored_task;
18use iota_open_rpc::Module;
19use iota_types::{
20    MoveTypeTagTrait,
21    base_types::{IotaAddress, ObjectID},
22    committee::EpochId,
23    dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
24    error::{IotaError, UserInputError},
25    governance::StakedIota,
26    id::ID,
27    iota_serde::BigInt,
28    iota_system_state::{
29        IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
30        iota_system_state_summary::{
31            IotaSystemStateSummary, IotaSystemStateSummaryV1, IotaSystemStateSummaryV2,
32        },
33    },
34    object::{Object, ObjectRead},
35    timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44    IotaRpcModule, ObjectProvider,
45    authority_state::StateRead,
46    error::{Error, IotaRpcInputError, RpcInterimResult},
47    logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54    state: Arc<dyn StateRead>,
55    pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60        Self { state, metrics }
61    }
62
63    async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64        let state = self.state.clone();
65        let result =
66            spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68        self.metrics
69            .get_stake_iota_result_size
70            .observe(result.len() as f64);
71        self.metrics
72            .get_stake_iota_result_size_total
73            .inc_by(result.len() as u64);
74        Ok(result)
75    }
76
77    async fn get_timelocked_staked_iota(
78        &self,
79        owner: IotaAddress,
80    ) -> Result<Vec<TimelockedStakedIota>, Error> {
81        let state = self.state.clone();
82        let result =
83            spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84                .await??;
85
86        self.metrics
87            .get_stake_iota_result_size
88            .observe(result.len() as f64);
89        self.metrics
90            .get_stake_iota_result_size_total
91            .inc_by(result.len() as u64);
92        Ok(result)
93    }
94
95    async fn get_stakes_by_ids(
96        &self,
97        staked_iota_ids: Vec<ObjectID>,
98    ) -> Result<Vec<DelegatedStake>, Error> {
99        let state = self.state.clone();
100        let stakes_read = spawn_monitored_task!(async move {
101            staked_iota_ids
102                .iter()
103                .map(|id| state.get_object_read(id))
104                .collect::<Result<Vec<_>, _>>()
105        })
106        .await??;
107
108        if stakes_read.is_empty() {
109            return Ok(vec![]);
110        }
111
112        let stakes: Vec<(StakedIota, bool)> = self
113            .stakes_with_status(stakes_read.into_iter())
114            .await?
115            .into_iter()
116            .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117            .collect::<Result<_, _>>()?;
118
119        self.get_delegated_stakes(stakes).await
120    }
121
122    async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123        let timer = self.metrics.get_stake_iota_latency.start_timer();
124        let stakes = self.get_staked_iota(owner).await?;
125        if stakes.is_empty() {
126            return Ok(vec![]);
127        }
128        drop(timer);
129
130        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132        let self_clone = self.clone();
133        spawn_monitored_task!(
134            self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135        )
136        .await?
137    }
138
139    async fn get_timelocked_stakes_by_ids(
140        &self,
141        timelocked_staked_iota_ids: Vec<ObjectID>,
142    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143        let state = self.state.clone();
144        let stakes_read = spawn_monitored_task!(async move {
145            timelocked_staked_iota_ids
146                .iter()
147                .map(|id| state.get_object_read(id))
148                .collect::<Result<Vec<_>, _>>()
149        })
150        .await??;
151
152        if stakes_read.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let stakes: Vec<(TimelockedStakedIota, bool)> = self
157            .stakes_with_status(stakes_read.into_iter())
158            .await?
159            .into_iter()
160            .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161            .collect::<Result<_, _>>()?;
162
163        self.get_delegated_timelocked_stakes(stakes).await
164    }
165
166    async fn get_timelocked_stakes(
167        &self,
168        owner: IotaAddress,
169    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170        let timer = self.metrics.get_stake_iota_latency.start_timer();
171        let stakes = self.get_timelocked_staked_iota(owner).await?;
172        if stakes.is_empty() {
173            return Ok(vec![]);
174        }
175        drop(timer);
176
177        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179        let self_clone = self.clone();
180        spawn_monitored_task!(
181            self_clone
182                .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183        )
184        .await?
185    }
186
187    async fn get_delegated_stakes(
188        &self,
189        stakes: Vec<(StakedIota, bool)>,
190    ) -> Result<Vec<DelegatedStake>, Error> {
191        let pools = stakes.into_iter().fold(
192            BTreeMap::<_, Vec<_>>::new(),
193            |mut pools, (stake, exists)| {
194                pools
195                    .entry(stake.pool_id())
196                    .or_default()
197                    .push((stake, exists));
198                pools
199            },
200        );
201
202        let system_state = self.get_system_state()?;
203        let system_state_summary = IotaSystemStateSummaryV2::try_from(
204            system_state.clone().into_iota_system_state_summary(),
205        )?;
206
207        let rates = exchange_rates(&self.state, system_state_summary.epoch)
208            .await?
209            .into_iter()
210            // Try to find for any candidate validator exchange rate
211            .chain(candidate_validators_exchange_rate(&self.state)?.into_iter())
212            // Try to find for any pending validator exchange rate
213            .chain(pending_validators_exchange_rate(&self.state)?.into_iter())
214            .map(|rates| (rates.pool_id, rates))
215            .collect::<BTreeMap<_, _>>();
216
217        let mut delegated_stakes = vec![];
218        for (pool_id, stakes) in pools {
219            // Rate table and rate can be null when the pool is not active
220            let rate_table = rates.get(&pool_id).ok_or_else(|| {
221                IotaRpcInputError::GenericNotFound(format!(
222                    "Cannot find rates for staking pool {pool_id}"
223                ))
224            })?;
225            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227            let mut delegations = vec![];
228            for (stake, exists) in stakes {
229                let status = stake_status(
230                    system_state_summary.epoch,
231                    stake.activation_epoch(),
232                    stake.principal(),
233                    exists,
234                    current_rate,
235                    rate_table,
236                );
237                delegations.push(Stake {
238                    staked_iota_id: stake.id(),
239                    // TODO: this might change when we implement warm up period.
240                    stake_request_epoch: stake.activation_epoch() - 1,
241                    stake_active_epoch: stake.activation_epoch(),
242                    principal: stake.principal(),
243                    status,
244                })
245            }
246            delegated_stakes.push(DelegatedStake {
247                validator_address: rate_table.address,
248                staking_pool: pool_id,
249                stakes: delegations,
250            })
251        }
252        Ok(delegated_stakes)
253    }
254
255    async fn get_delegated_timelocked_stakes(
256        &self,
257        stakes: Vec<(TimelockedStakedIota, bool)>,
258    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259        let pools = stakes.into_iter().fold(
260            BTreeMap::<_, Vec<_>>::new(),
261            |mut pools, (stake, exists)| {
262                pools
263                    .entry(stake.pool_id())
264                    .or_default()
265                    .push((stake, exists));
266                pools
267            },
268        );
269
270        let system_state = self.get_system_state()?;
271        let system_state_summary = IotaSystemStateSummaryV2::try_from(
272            system_state.clone().into_iota_system_state_summary(),
273        )?;
274
275        let rates = exchange_rates(&self.state, system_state_summary.epoch)
276            .await?
277            .into_iter()
278            // Try to find for any candidate validator exchange rate
279            .chain(candidate_validators_exchange_rate(&self.state)?)
280            // Try to find for any pending validator exchange rate
281            .chain(pending_validators_exchange_rate(&self.state)?)
282            .map(|rates| (rates.pool_id, rates))
283            .collect::<BTreeMap<_, _>>();
284
285        let mut delegated_stakes = vec![];
286        for (pool_id, stakes) in pools {
287            // Rate table and rate can be null when the pool is not active
288            let rate_table = rates.get(&pool_id).ok_or_else(|| {
289                IotaRpcInputError::GenericNotFound(format!(
290                    "Cannot find rates for staking pool {pool_id}"
291                ))
292            })?;
293            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295            let mut delegations = vec![];
296            for (stake, exists) in stakes {
297                let status = stake_status(
298                    system_state_summary.epoch,
299                    stake.activation_epoch(),
300                    stake.principal(),
301                    exists,
302                    current_rate,
303                    rate_table,
304                );
305                delegations.push(TimelockedStake {
306                    timelocked_staked_iota_id: stake.id(),
307                    // TODO: this might change when we implement warm up period.
308                    stake_request_epoch: stake.activation_epoch() - 1,
309                    stake_active_epoch: stake.activation_epoch(),
310                    principal: stake.principal(),
311                    status,
312                    expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313                    label: stake.label().clone(),
314                })
315            }
316            delegated_stakes.push(DelegatedTimelockedStake {
317                validator_address: rate_table.address,
318                staking_pool: pool_id,
319                stakes: delegations,
320            })
321        }
322        Ok(delegated_stakes)
323    }
324
325    async fn stakes_with_status(
326        &self,
327        iter: impl Iterator<Item = ObjectRead>,
328    ) -> Result<Vec<(Object, bool)>, Error> {
329        let mut stakes = vec![];
330
331        for stake in iter {
332            match stake {
333                ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334                ObjectRead::Deleted((object_id, version, _)) => {
335                    let Some(o) = self
336                        .state
337                        .find_object_lt_or_eq_version(&object_id, &version.one_before().unwrap())
338                        .await?
339                    else {
340                        Err(IotaRpcInputError::UserInput(
341                            UserInputError::ObjectNotFound {
342                                object_id,
343                                version: None,
344                            },
345                        ))?
346                    };
347                    stakes.push((o, false));
348                }
349                ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
350                    UserInputError::ObjectNotFound {
351                        object_id: id,
352                        version: None,
353                    },
354                ))?,
355            }
356        }
357
358        Ok(stakes)
359    }
360
361    fn get_system_state(&self) -> Result<IotaSystemState, Error> {
362        Ok(self.state.get_system_state()?)
363    }
364}
365
366#[async_trait]
367impl GovernanceReadApiServer for GovernanceReadApi {
368    #[instrument(skip(self))]
369    async fn get_stakes_by_ids(
370        &self,
371        staked_iota_ids: Vec<ObjectID>,
372    ) -> RpcResult<Vec<DelegatedStake>> {
373        self.get_stakes_by_ids(staked_iota_ids).trace().await
374    }
375
376    #[instrument(skip(self))]
377    async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
378        self.get_stakes(owner).trace().await
379    }
380
381    #[instrument(skip(self))]
382    async fn get_timelocked_stakes_by_ids(
383        &self,
384        timelocked_staked_iota_ids: Vec<ObjectID>,
385    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
386        self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
387            .trace()
388            .await
389    }
390
391    #[instrument(skip(self))]
392    async fn get_timelocked_stakes(
393        &self,
394        owner: IotaAddress,
395    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
396        self.get_timelocked_stakes(owner).trace().await
397    }
398
399    #[instrument(skip(self))]
400    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
401        async move {
402            self.state
403                .get_or_latest_committee(epoch)
404                .map(|committee| committee.into())
405                .map_err(Error::from)
406        }
407        .trace()
408        .await
409    }
410
411    #[instrument(skip(self))]
412    async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummary> {
413        async move {
414            Ok(self
415                .state
416                .get_system_state()?
417                .into_iota_system_state_summary())
418        }
419        .trace()
420        .await
421    }
422
423    #[instrument(skip(self))]
424    async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1> {
425        async move {
426            Ok(self
427                .state
428                .get_system_state()?
429                .into_iota_system_state_summary()
430                .try_into()?)
431        }
432        .trace()
433        .await
434    }
435
436    #[instrument(skip(self))]
437    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
438        async move {
439            let epoch_store = self.state.load_epoch_store_one_call_per_task();
440            Ok(epoch_store.reference_gas_price().into())
441        }
442        .trace()
443        .await
444    }
445
446    #[instrument(skip(self))]
447    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
448        info!("get_validator_apy");
449        let system_state_summary = self.get_latest_iota_system_state().await?;
450
451        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
452            .await
453            .map_err(|e| error_object_from_rpc(e.into()))?;
454
455        let apys = calculate_apys(exchange_rate_table);
456
457        Ok(ValidatorApys {
458            apys,
459            epoch: system_state_summary.epoch,
460        })
461    }
462}
463
464pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
465    let mut apys = vec![];
466
467    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
468        let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
469
470        let median_apy = median_apy_from_exchange_rates(exchange_rates);
471        apys.push(ValidatorApy {
472            address: rates.address,
473            apy: median_apy,
474        });
475    }
476    apys
477}
478
479/// Calculate the APY for a validator based on the exchange rates of the staking
480/// pool.
481///
482/// The calculation uses the median value of the sample, to filter out
483/// outliers introduced by large staking/unstaking events.
484pub fn median_apy_from_exchange_rates<'er>(
485    exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
486) -> f64 {
487    // rates are sorted by epoch in descending order.
488    let rates = exchange_rates.clone().dropping(1);
489    let rates_next = exchange_rates.dropping_back(1);
490    let apys = rates
491        .zip(rates_next)
492        .filter_map(|(er, er_next)| {
493            let apy = calculate_apy(er, er_next);
494            (apy > 0.0).then_some(apy)
495        })
496        .take(90)
497        .collect::<Vec<_>>();
498
499    if apys.is_empty() {
500        // not enough data points
501        0.0
502    } else {
503        Data::new(apys).median()
504    }
505}
506
507/// Calculate the APY by the exchange rate of two consecutive epochs
508/// (`er`, `er_next`).
509///
510/// The formula used is `APY_e = (er.rate - er_next.rate) / er.rate * 365`
511fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
512    ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
513}
514
515fn stake_status(
516    epoch: u64,
517    activation_epoch: u64,
518    principal: u64,
519    exists: bool,
520    current_rate: Option<&PoolTokenExchangeRate>,
521    rate_table: &ValidatorExchangeRates,
522) -> StakeStatus {
523    if !exists {
524        StakeStatus::Unstaked
525    } else if epoch >= activation_epoch {
526        // TODO: use dev_inspect to call a move function to get the estimated reward
527        let estimated_reward = if let Some(current_rate) = current_rate {
528            let stake_rate = rate_table
529                .rates
530                .iter()
531                .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
532                .unwrap_or_default();
533            let estimated_reward =
534                ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
535            max(0, estimated_reward.round() as u64)
536        } else {
537            0
538        };
539        StakeStatus::Active { estimated_reward }
540    } else {
541        StakeStatus::Pending
542    }
543}
544
545/// Cached exchange rates for validators for the given epoch, the cache size is
546/// 1, it will be cleared when the epoch changes. rates are in descending order
547/// by epoch.
548#[cached(
549    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
550    create = "{ SizedCache::with_size(1) }",
551    convert = "{ _current_epoch }",
552    result = true
553)]
554async fn exchange_rates(
555    state: &Arc<dyn StateRead>,
556    _current_epoch: EpochId,
557) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
558    Ok(active_validators_exchange_rates(state)?
559        .into_iter()
560        .chain(inactive_validators_exchange_rates(state)?.into_iter())
561        .collect())
562}
563
564// `cached` keeps results by the input key -- `current_epoch`.
565// `exchange_rates` is not a pure function, has effects via `state`
566// which `cached` is not aware of.
567// In normal node operation this does not create issues.
568// In tests that run several different networks the latter calls
569// will get incorrect/outdated cached results.
570// This function allows to clear `cached` cache for `exchange_rates`.
571#[cfg(msim)]
572pub async fn clear_exchange_rates_cache_for_testing() {
573    use cached::Cached;
574    if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
575        let mut cache = mutex.lock().await;
576        cache.cache_clear();
577    }
578}
579
580/// Get validator exchange rates
581fn validator_exchange_rates(
582    state: &Arc<dyn StateRead>,
583    tables: Vec<ValidatorTable>,
584) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
585    if tables.is_empty() {
586        return Ok(vec![]);
587    };
588
589    let mut exchange_rates = vec![];
590    // Get exchange rates for each validator
591    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
592        let mut rates = state
593            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
594            .into_iter()
595            .map(|(_object_id, df)| {
596                let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
597                    IotaError::ObjectDeserialization {
598                        error: e.to_string(),
599                    }
600                })?;
601
602                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
603                    &state.get_object_store().as_ref(),
604                    exchange_rates_id,
605                    &epoch,
606                )?;
607
608                Ok::<_, IotaError>((epoch, exchange_rate))
609            })
610            .collect::<Result<Vec<_>, _>>()?;
611
612        // Rates for some epochs might be missing due to safe mode, we need to backfill
613        // them.
614        rates = backfill_rates(rates);
615
616        exchange_rates.push(ValidatorExchangeRates {
617            address,
618            pool_id,
619            active,
620            rates,
621        });
622    }
623
624    Ok(exchange_rates)
625}
626
627/// Check for validators in the `Active` state and get its exchange rate
628fn active_validators_exchange_rates(
629    state: &Arc<dyn StateRead>,
630) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
631    let system_state_summary = IotaSystemStateSummaryV2::try_from(
632        state.get_system_state()?.into_iota_system_state_summary(),
633    )?;
634
635    let tables = system_state_summary
636        .active_validators
637        .into_iter()
638        .map(|validator| {
639            (
640                validator.iota_address,
641                validator.staking_pool_id,
642                validator.exchange_rates_id,
643                validator.exchange_rates_size,
644                true,
645            )
646        })
647        .collect();
648
649    validator_exchange_rates(state, tables)
650}
651
652/// Check for validators in the `Inactive` state and get its exchange rate
653fn inactive_validators_exchange_rates(
654    state: &Arc<dyn StateRead>,
655) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
656    let system_state_summary = IotaSystemStateSummaryV2::try_from(
657        state.get_system_state()?.into_iota_system_state_summary(),
658    )?;
659
660    let tables = validator_summary_from_system_state(
661        state,
662        system_state_summary.inactive_pools_id,
663        system_state_summary.inactive_pools_size,
664        |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
665    )?;
666
667    validator_exchange_rates(state, tables)
668}
669
670/// Check for validators in the `Pending` state and get its exchange rate. For
671/// these validators, their exchange rates should not be cached as their state
672/// can occur during an epoch or across multiple ones. In contrast, exchange
673/// rates for `Active` and `Inactive` validators can be cached, as their state
674/// changes only at epoch change.
675fn pending_validators_exchange_rate(
676    state: &Arc<dyn StateRead>,
677) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
678    let system_state = state.get_system_state()?;
679    let object_store = state.get_object_store();
680
681    // Try to find for any pending active validator
682    let tables = system_state
683        .get_pending_active_validators(object_store)?
684        .into_iter()
685        .map(|pending_active_validator| {
686            (
687                pending_active_validator.iota_address,
688                pending_active_validator.staking_pool_id,
689                pending_active_validator.exchange_rates_id,
690                pending_active_validator.exchange_rates_size,
691                false,
692            )
693        })
694        .collect::<Vec<ValidatorTable>>();
695
696    validator_exchange_rates(state, tables)
697}
698
699/// Check for validators in the `Candidate` state and get its exchange rate. For
700/// these validators, their exchange rates should not be cached as their state
701/// can occur during an epoch or across multiple ones. In contrast, exchange
702/// rates for `Active` and `Inactive` validators can be cached, as their state
703/// changes only at epoch change.
704fn candidate_validators_exchange_rate(
705    state: &Arc<dyn StateRead>,
706) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
707    let system_state_summary = IotaSystemStateSummaryV2::try_from(
708        state.get_system_state()?.into_iota_system_state_summary(),
709    )?;
710
711    // From validator_candidates_id table get validator info using as key its
712    // IotaAddress
713    let tables = validator_summary_from_system_state(
714        state,
715        system_state_summary.validator_candidates_id,
716        system_state_summary.validator_candidates_size,
717        |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
718    )?;
719
720    validator_exchange_rates(state, tables)
721}
722
723/// Fetches validator status information from `StateRead`.
724///
725/// This makes sense for validators not included in `IotaSystemStateSummary`.
726/// `IotaSystemStateSummary` only contains information about `Active`
727/// validators. To retrieve information about `Inactive`, `Candidate`, and
728/// `Pending` validators, we need to access dynamic fields within specific
729/// Move tables.
730///
731/// To retrieve validator status information, this function utilizes the
732/// corresponding `table_id` (an `ObjectID` value) and a `limit` to specify the
733/// number of records to fetch. Both the `table_id` and `limit` can be obtained
734/// from `IotaSystemStateSummary` in the caller. Additionally, keys are
735/// extracted from the table `DynamicFieldInfo` values according to the `key`
736/// closure. This helps in identifying the specific validator within the table.
737///
738/// # Example
739///
740/// ```text
741/// // Get inactive validators
742/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
743/// let _ = validator_summary_from_system_state(
744///     state,
745///     // ID of the object that maps from a staking pool ID to the inactive validator that has that pool as its staking pool.
746///     system_state_summary.inactive_pools_id,
747///     // Number of inactive staking pools.
748///     system_state_summary.inactive_pools_size,
749///     // Extract the `ID` of the `Inactive` validator from the `DynamicFieldInfo` in the `system_state_summary.inactive_pools_id` table
750///     |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
751/// ).unwrap();
752/// ```
753///
754/// # Example
755///
756/// ```text
757/// // Get candidate validators
758/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
759/// let _ = validator_summary_from_system_state(
760///     state,
761///     // ID of the object that stores preactive validators, mapping their addresses to their Validator structs
762///     system_state_summary.validator_candidates_id,
763///     // Number of preactive validators
764///     system_state_summary.validator_candidates_size,
765///     // Extract the `IotaAddress` of the `Candidate` validator from the `DynamicFieldInfo` in the `system_state_summary.validator_candidates_id` table
766///     |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
767/// ).unwrap();
768/// ```
769fn validator_summary_from_system_state<K, F>(
770    state: &Arc<dyn StateRead>,
771    table_id: ObjectID,
772    limit: u64,
773    key: F,
774) -> RpcInterimResult<Vec<ValidatorTable>>
775where
776    F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
777    K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
778{
779    let object_store = state.get_object_store();
780
781    state
782        .get_dynamic_fields(table_id, None, limit as usize)?
783        .into_iter()
784        .map(|(_object_id, df)| {
785            let validator_summary = get_validator_from_table(object_store, table_id, &key(df)?)?;
786
787            Ok((
788                validator_summary.iota_address,
789                validator_summary.staking_pool_id,
790                validator_summary.exchange_rates_id,
791                validator_summary.exchange_rates_size,
792                false,
793            ))
794        })
795        .collect()
796}
797
798#[derive(Clone, Debug)]
799pub struct ValidatorExchangeRates {
800    pub address: IotaAddress,
801    pub pool_id: ObjectID,
802    pub active: bool,
803    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
804}
805
806/// Backfill missing rates for some epochs due to safe mode. If a rate is
807/// missing for epoch e, we will use the rate for epoch e-1 to fill it. Rates
808/// returned are in descending order by epoch.
809fn backfill_rates(
810    mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
811) -> Vec<(EpochId, PoolTokenExchangeRate)> {
812    if rates.is_empty() {
813        return rates;
814    }
815    // ensure epochs are processed in increasing order
816    rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
817
818    // Check if there are any gaps in the epochs
819    let (min_epoch, _) = rates.first().expect("rates should not be empty");
820    let (max_epoch, _) = rates.last().expect("rates should not be empty");
821    let expected_len = (max_epoch - min_epoch + 1) as usize;
822    let current_len = rates.len();
823
824    // Only perform backfilling if there are gaps
825    if current_len == expected_len {
826        rates.reverse();
827        return rates;
828    }
829
830    let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
831    let mut missing_rates = Vec::with_capacity(expected_len - current_len);
832    for (epoch_id, rate) in rates {
833        // fill gaps between the last processed epoch and the current one
834        if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
835            for missing_epoch_id in prev_epoch_id + 1..epoch_id {
836                missing_rates.push((missing_epoch_id, prev_rate.clone()));
837            }
838        };
839
840        // append any missing_rates before adding the current epoch.
841        // if empty, nothing gets appended.
842        // if not empty, it will be empty afterwards because it was moved into
843        // filled_rates
844        filled_rates.append(&mut missing_rates);
845        filled_rates.push((epoch_id, rate));
846    }
847    filled_rates.reverse();
848    filled_rates
849}
850
851impl IotaRpcModule for GovernanceReadApi {
852    fn rpc(self) -> RpcModule<Self> {
853        self.into_rpc()
854    }
855
856    fn rpc_doc_module() -> Module {
857        GovernanceReadApiOpenRpc::module_doc()
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use iota_types::iota_system_state::PoolTokenExchangeRate;
864
865    use super::*;
866
867    #[test]
868    fn calculate_apys_with_outliers() {
869        let file =
870            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
871        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
872            serde_json::from_reader(file).unwrap();
873
874        let mut address_map = BTreeMap::new();
875
876        let exchange_rates = rates
877            .into_iter()
878            .map(|(validator, rates)| {
879                let address = IotaAddress::random_for_testing_only();
880                address_map.insert(address, validator);
881                ValidatorExchangeRates {
882                    address,
883                    pool_id: ObjectID::random(),
884                    active: true,
885                    rates,
886                }
887            })
888            .collect();
889
890        let apys = calculate_apys(exchange_rates);
891
892        for apy in &apys {
893            println!("{}: {}", address_map[&apy.address], apy.apy);
894            assert!(apy.apy < 0.25)
895        }
896    }
897
898    #[test]
899    fn test_backfill_rates_empty() {
900        let rates = vec![];
901        assert_eq!(backfill_rates(rates), vec![]);
902    }
903
904    #[test]
905    fn test_backfill_rates_no_gaps() {
906        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
907        let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
908        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
909        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
910
911        let expected: Vec<(u64, PoolTokenExchangeRate)> =
912            vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
913        assert_eq!(backfill_rates(rates), expected);
914    }
915
916    #[test]
917    fn test_backfill_single_rate() {
918        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
919        let rates = vec![(1, rate1.clone())];
920        let expected = vec![(1, rate1)];
921        assert_eq!(backfill_rates(rates), expected);
922    }
923
924    #[test]
925    fn test_backfill_rates_with_gaps() {
926        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
927        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
928        let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
929        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
930
931        let expected = vec![
932            (5, rate5.clone()),
933            (4, rate3.clone()),
934            (3, rate3.clone()),
935            (2, rate1.clone()),
936            (1, rate1),
937        ];
938        assert_eq!(backfill_rates(rates), expected);
939    }
940
941    #[test]
942    fn test_backfill_rates_missing_middle_epoch() {
943        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
944        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
945        let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
946        let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
947        assert_eq!(backfill_rates(rates), expected);
948    }
949
950    #[test]
951    fn test_backfill_rates_missing_middle_epochs() {
952        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
953        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
954        let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
955        let expected = vec![
956            (4, rate4),
957            (3, rate1.clone()),
958            (2, rate1.clone()),
959            (1, rate1),
960        ];
961        assert_eq!(backfill_rates(rates), expected);
962    }
963
964    #[test]
965    fn test_backfill_rates_unordered_input() {
966        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
967        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
968        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
969        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
970        let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
971        assert_eq!(backfill_rates(rates), expected);
972    }
973}