iota_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11    GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14    DelegatedStake, DelegatedTimelockedStake, IotaCommittee, Stake, StakeStatus, TimelockedStake,
15    ValidatorApy, ValidatorApys,
16};
17use iota_metrics::spawn_monitored_task;
18use iota_open_rpc::Module;
19use iota_types::{
20    MoveTypeTagTrait,
21    base_types::{IotaAddress, ObjectID},
22    committee::EpochId,
23    dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
24    error::{IotaError, UserInputError},
25    governance::StakedIota,
26    id::ID,
27    iota_serde::BigInt,
28    iota_system_state::{
29        IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
30        iota_system_state_summary::{
31            IotaSystemStateSummary, IotaSystemStateSummaryV1, IotaSystemStateSummaryV2,
32        },
33    },
34    object::{Object, ObjectRead},
35    timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44    IotaRpcModule, ObjectProvider,
45    authority_state::StateRead,
46    error::{Error, IotaRpcInputError, RpcInterimResult},
47    logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54    state: Arc<dyn StateRead>,
55    pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60        Self { state, metrics }
61    }
62
63    async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64        let state = self.state.clone();
65        let result =
66            spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68        self.metrics
69            .get_stake_iota_result_size
70            .report(result.len() as u64);
71        self.metrics
72            .get_stake_iota_result_size_total
73            .inc_by(result.len() as u64);
74        Ok(result)
75    }
76
77    async fn get_timelocked_staked_iota(
78        &self,
79        owner: IotaAddress,
80    ) -> Result<Vec<TimelockedStakedIota>, Error> {
81        let state = self.state.clone();
82        let result =
83            spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84                .await??;
85
86        self.metrics
87            .get_stake_iota_result_size
88            .report(result.len() as u64);
89        self.metrics
90            .get_stake_iota_result_size_total
91            .inc_by(result.len() as u64);
92        Ok(result)
93    }
94
95    async fn get_stakes_by_ids(
96        &self,
97        staked_iota_ids: Vec<ObjectID>,
98    ) -> Result<Vec<DelegatedStake>, Error> {
99        let state = self.state.clone();
100        let stakes_read = spawn_monitored_task!(async move {
101            staked_iota_ids
102                .iter()
103                .map(|id| state.get_object_read(id))
104                .collect::<Result<Vec<_>, _>>()
105        })
106        .await??;
107
108        if stakes_read.is_empty() {
109            return Ok(vec![]);
110        }
111
112        let stakes: Vec<(StakedIota, bool)> = self
113            .stakes_with_status(stakes_read.into_iter())
114            .await?
115            .into_iter()
116            .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117            .collect::<Result<_, _>>()?;
118
119        self.get_delegated_stakes(stakes).await
120    }
121
122    async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123        let timer = self.metrics.get_stake_iota_latency.start_timer();
124        let stakes = self.get_staked_iota(owner).await?;
125        if stakes.is_empty() {
126            return Ok(vec![]);
127        }
128        drop(timer);
129
130        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132        let self_clone = self.clone();
133        spawn_monitored_task!(
134            self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135        )
136        .await?
137    }
138
139    async fn get_timelocked_stakes_by_ids(
140        &self,
141        timelocked_staked_iota_ids: Vec<ObjectID>,
142    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143        let state = self.state.clone();
144        let stakes_read = spawn_monitored_task!(async move {
145            timelocked_staked_iota_ids
146                .iter()
147                .map(|id| state.get_object_read(id))
148                .collect::<Result<Vec<_>, _>>()
149        })
150        .await??;
151
152        if stakes_read.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let stakes: Vec<(TimelockedStakedIota, bool)> = self
157            .stakes_with_status(stakes_read.into_iter())
158            .await?
159            .into_iter()
160            .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161            .collect::<Result<_, _>>()?;
162
163        self.get_delegated_timelocked_stakes(stakes).await
164    }
165
166    async fn get_timelocked_stakes(
167        &self,
168        owner: IotaAddress,
169    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170        let timer = self.metrics.get_stake_iota_latency.start_timer();
171        let stakes = self.get_timelocked_staked_iota(owner).await?;
172        if stakes.is_empty() {
173            return Ok(vec![]);
174        }
175        drop(timer);
176
177        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179        let self_clone = self.clone();
180        spawn_monitored_task!(
181            self_clone
182                .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183        )
184        .await?
185    }
186
187    async fn get_delegated_stakes(
188        &self,
189        stakes: Vec<(StakedIota, bool)>,
190    ) -> Result<Vec<DelegatedStake>, Error> {
191        let pools = stakes.into_iter().fold(
192            BTreeMap::<_, Vec<_>>::new(),
193            |mut pools, (stake, exists)| {
194                pools
195                    .entry(stake.pool_id())
196                    .or_default()
197                    .push((stake, exists));
198                pools
199            },
200        );
201
202        let system_state = self.get_system_state()?;
203        let system_state_summary = IotaSystemStateSummaryV2::try_from(
204            system_state.clone().into_iota_system_state_summary(),
205        )?;
206
207        let rates = exchange_rates(&self.state, system_state_summary.epoch)
208            .await?
209            .into_iter()
210            // Try to find for any candidate validator exchange rate
211            .chain(candidate_validators_exchange_rate(&self.state)?.into_iter())
212            // Try to find for any pending validator exchange rate
213            .chain(pending_validators_exchange_rate(&self.state)?.into_iter())
214            .map(|rates| (rates.pool_id, rates))
215            .collect::<BTreeMap<_, _>>();
216
217        let mut delegated_stakes = vec![];
218        for (pool_id, stakes) in pools {
219            // Rate table and rate can be null when the pool is not active
220            let rate_table = rates.get(&pool_id).ok_or_else(|| {
221                IotaRpcInputError::GenericNotFound(format!(
222                    "Cannot find rates for staking pool {pool_id}"
223                ))
224            })?;
225            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227            let mut delegations = vec![];
228            for (stake, exists) in stakes {
229                let status = stake_status(
230                    system_state_summary.epoch,
231                    stake.activation_epoch(),
232                    stake.principal(),
233                    exists,
234                    current_rate,
235                    rate_table,
236                );
237                delegations.push(Stake {
238                    staked_iota_id: stake.id(),
239                    // TODO: this might change when we implement warm up period.
240                    stake_request_epoch: stake.activation_epoch() - 1,
241                    stake_active_epoch: stake.activation_epoch(),
242                    principal: stake.principal(),
243                    status,
244                })
245            }
246            delegated_stakes.push(DelegatedStake {
247                validator_address: rate_table.address,
248                staking_pool: pool_id,
249                stakes: delegations,
250            })
251        }
252        Ok(delegated_stakes)
253    }
254
255    async fn get_delegated_timelocked_stakes(
256        &self,
257        stakes: Vec<(TimelockedStakedIota, bool)>,
258    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259        let pools = stakes.into_iter().fold(
260            BTreeMap::<_, Vec<_>>::new(),
261            |mut pools, (stake, exists)| {
262                pools
263                    .entry(stake.pool_id())
264                    .or_default()
265                    .push((stake, exists));
266                pools
267            },
268        );
269
270        let system_state = self.get_system_state()?;
271        let system_state_summary = IotaSystemStateSummaryV2::try_from(
272            system_state.clone().into_iota_system_state_summary(),
273        )?;
274
275        let rates = exchange_rates(&self.state, system_state_summary.epoch)
276            .await?
277            .into_iter()
278            .map(|rates| (rates.pool_id, rates))
279            .collect::<BTreeMap<_, _>>();
280
281        let mut delegated_stakes = vec![];
282        for (pool_id, stakes) in pools {
283            // Rate table and rate can be null when the pool is not active
284            let rate_table = rates.get(&pool_id).ok_or_else(|| {
285                IotaRpcInputError::GenericNotFound(format!(
286                    "Cannot find rates for staking pool {pool_id}"
287                ))
288            })?;
289            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
290
291            let mut delegations = vec![];
292            for (stake, exists) in stakes {
293                let status = stake_status(
294                    system_state_summary.epoch,
295                    stake.activation_epoch(),
296                    stake.principal(),
297                    exists,
298                    current_rate,
299                    rate_table,
300                );
301                delegations.push(TimelockedStake {
302                    timelocked_staked_iota_id: stake.id(),
303                    // TODO: this might change when we implement warm up period.
304                    stake_request_epoch: stake.activation_epoch() - 1,
305                    stake_active_epoch: stake.activation_epoch(),
306                    principal: stake.principal(),
307                    status,
308                    expiration_timestamp_ms: stake.expiration_timestamp_ms(),
309                    label: stake.label().clone(),
310                })
311            }
312            delegated_stakes.push(DelegatedTimelockedStake {
313                validator_address: rate_table.address,
314                staking_pool: pool_id,
315                stakes: delegations,
316            })
317        }
318        Ok(delegated_stakes)
319    }
320
321    async fn stakes_with_status(
322        &self,
323        iter: impl Iterator<Item = ObjectRead>,
324    ) -> Result<Vec<(Object, bool)>, Error> {
325        let mut stakes = vec![];
326
327        for stake in iter {
328            match stake {
329                ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
330                ObjectRead::Deleted((object_id, version, _)) => {
331                    let Some(o) = self
332                        .state
333                        .find_object_lt_or_eq_version(&object_id, &version.one_before().unwrap())
334                        .await?
335                    else {
336                        Err(IotaRpcInputError::UserInput(
337                            UserInputError::ObjectNotFound {
338                                object_id,
339                                version: None,
340                            },
341                        ))?
342                    };
343                    stakes.push((o, false));
344                }
345                ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
346                    UserInputError::ObjectNotFound {
347                        object_id: id,
348                        version: None,
349                    },
350                ))?,
351            }
352        }
353
354        Ok(stakes)
355    }
356
357    fn get_system_state(&self) -> Result<IotaSystemState, Error> {
358        Ok(self.state.get_system_state()?)
359    }
360}
361
362#[async_trait]
363impl GovernanceReadApiServer for GovernanceReadApi {
364    #[instrument(skip(self))]
365    async fn get_stakes_by_ids(
366        &self,
367        staked_iota_ids: Vec<ObjectID>,
368    ) -> RpcResult<Vec<DelegatedStake>> {
369        self.get_stakes_by_ids(staked_iota_ids).trace().await
370    }
371
372    #[instrument(skip(self))]
373    async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
374        self.get_stakes(owner).trace().await
375    }
376
377    #[instrument(skip(self))]
378    async fn get_timelocked_stakes_by_ids(
379        &self,
380        timelocked_staked_iota_ids: Vec<ObjectID>,
381    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
382        self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
383            .trace()
384            .await
385    }
386
387    #[instrument(skip(self))]
388    async fn get_timelocked_stakes(
389        &self,
390        owner: IotaAddress,
391    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
392        self.get_timelocked_stakes(owner).trace().await
393    }
394
395    #[instrument(skip(self))]
396    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
397        async move {
398            self.state
399                .get_or_latest_committee(epoch)
400                .map(|committee| committee.into())
401                .map_err(Error::from)
402        }
403        .trace()
404        .await
405    }
406
407    #[instrument(skip(self))]
408    async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummary> {
409        async move {
410            Ok(self
411                .state
412                .get_system_state()?
413                .into_iota_system_state_summary())
414        }
415        .trace()
416        .await
417    }
418
419    #[instrument(skip(self))]
420    async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1> {
421        async move {
422            Ok(self
423                .state
424                .get_system_state()?
425                .into_iota_system_state_summary()
426                .try_into()?)
427        }
428        .trace()
429        .await
430    }
431
432    #[instrument(skip(self))]
433    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
434        async move {
435            let epoch_store = self.state.load_epoch_store_one_call_per_task();
436            Ok(epoch_store.reference_gas_price().into())
437        }
438        .trace()
439        .await
440    }
441
442    #[instrument(skip(self))]
443    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
444        info!("get_validator_apy");
445        let system_state_summary = self.get_latest_iota_system_state().await?;
446
447        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
448            .await
449            .map_err(|e| error_object_from_rpc(e.into()))?;
450
451        let apys = calculate_apys(exchange_rate_table);
452
453        Ok(ValidatorApys {
454            apys,
455            epoch: system_state_summary.epoch,
456        })
457    }
458}
459
460pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
461    let mut apys = vec![];
462
463    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
464        let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
465
466        let median_apy = median_apy_from_exchange_rates(exchange_rates);
467        apys.push(ValidatorApy {
468            address: rates.address,
469            apy: median_apy,
470        });
471    }
472    apys
473}
474
475/// Calculate the APY for a validator based on the exchange rates of the staking
476/// pool.
477///
478/// The calculation uses the median value of the sample, to filter out
479/// outliers introduced by large staking/unstaking events.
480pub fn median_apy_from_exchange_rates<'er>(
481    exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
482) -> f64 {
483    // rates are sorted by epoch in descending order.
484    let rates = exchange_rates.clone().dropping(1);
485    let rates_next = exchange_rates.dropping_back(1);
486    let apys = rates
487        .zip(rates_next)
488        .filter_map(|(er, er_next)| {
489            let apy = calculate_apy(er, er_next);
490            (apy > 0.0).then_some(apy)
491        })
492        .take(90)
493        .collect::<Vec<_>>();
494
495    if apys.is_empty() {
496        // not enough data points
497        0.0
498    } else {
499        Data::new(apys).median()
500    }
501}
502
503/// Calculate the APY by the exchange rate of two consecutive epochs
504/// (`er`, `er_next`).
505///
506/// The formula used is `APY_e = (er.rate - er_next.rate) / er.rate * 365`
507fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
508    ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
509}
510
511fn stake_status(
512    epoch: u64,
513    activation_epoch: u64,
514    principal: u64,
515    exists: bool,
516    current_rate: Option<&PoolTokenExchangeRate>,
517    rate_table: &ValidatorExchangeRates,
518) -> StakeStatus {
519    if !exists {
520        StakeStatus::Unstaked
521    } else if epoch >= activation_epoch {
522        // TODO: use dev_inspect to call a move function to get the estimated reward
523        let estimated_reward = if let Some(current_rate) = current_rate {
524            let stake_rate = rate_table
525                .rates
526                .iter()
527                .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
528                .unwrap_or_default();
529            let estimated_reward =
530                ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
531            max(0, estimated_reward.round() as u64)
532        } else {
533            0
534        };
535        StakeStatus::Active { estimated_reward }
536    } else {
537        StakeStatus::Pending
538    }
539}
540
541/// Cached exchange rates for validators for the given epoch, the cache size is
542/// 1, it will be cleared when the epoch changes. rates are in descending order
543/// by epoch.
544#[cached(
545    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
546    create = "{ SizedCache::with_size(1) }",
547    convert = "{ _current_epoch }",
548    result = true
549)]
550async fn exchange_rates(
551    state: &Arc<dyn StateRead>,
552    _current_epoch: EpochId,
553) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
554    Ok(active_validators_exchange_rates(state)?
555        .into_iter()
556        .chain(inactive_validators_exchange_rates(state)?.into_iter())
557        .collect())
558}
559
560// `cached` keeps results by the input key -- `current_epoch`.
561// `exchange_rates` is not a pure function, has effects via `state`
562// which `cached` is not aware of.
563// In normal node operation this does not create issues.
564// In tests that run several different networks the latter calls
565// will get incorrect/outdated cached results.
566// This function allows to clear `cached` cache for `exchange_rates`.
567#[cfg(msim)]
568pub async fn clear_exchange_rates_cache_for_testing() {
569    use cached::Cached;
570    if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
571        let mut cache = mutex.lock().await;
572        cache.cache_clear();
573    }
574}
575
576/// Get validator exchange rates
577fn validator_exchange_rates(
578    state: &Arc<dyn StateRead>,
579    tables: Vec<ValidatorTable>,
580) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
581    if tables.is_empty() {
582        return Ok(vec![]);
583    };
584
585    let mut exchange_rates = vec![];
586    // Get exchange rates for each validator
587    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
588        let mut rates = state
589            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
590            .into_iter()
591            .map(|(_object_id, df)| {
592                let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
593                    IotaError::ObjectDeserialization {
594                        error: e.to_string(),
595                    }
596                })?;
597
598                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
599                    &state.get_object_store().as_ref(),
600                    exchange_rates_id,
601                    &epoch,
602                )?;
603
604                Ok::<_, IotaError>((epoch, exchange_rate))
605            })
606            .collect::<Result<Vec<_>, _>>()?;
607
608        // Rates for some epochs might be missing due to safe mode, we need to backfill
609        // them.
610        rates = backfill_rates(rates);
611
612        exchange_rates.push(ValidatorExchangeRates {
613            address,
614            pool_id,
615            active,
616            rates,
617        });
618    }
619
620    Ok(exchange_rates)
621}
622
623/// Check for validators in the `Active` state and get its exchange rate
624fn active_validators_exchange_rates(
625    state: &Arc<dyn StateRead>,
626) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
627    let system_state_summary = IotaSystemStateSummaryV2::try_from(
628        state.get_system_state()?.into_iota_system_state_summary(),
629    )?;
630
631    let tables = system_state_summary
632        .active_validators
633        .into_iter()
634        .map(|validator| {
635            (
636                validator.iota_address,
637                validator.staking_pool_id,
638                validator.exchange_rates_id,
639                validator.exchange_rates_size,
640                true,
641            )
642        })
643        .collect();
644
645    validator_exchange_rates(state, tables)
646}
647
648/// Check for validators in the `Inactive` state and get its exchange rate
649fn inactive_validators_exchange_rates(
650    state: &Arc<dyn StateRead>,
651) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
652    let system_state_summary = IotaSystemStateSummaryV2::try_from(
653        state.get_system_state()?.into_iota_system_state_summary(),
654    )?;
655
656    let tables = validator_summary_from_system_state(
657        state,
658        system_state_summary.inactive_pools_id,
659        system_state_summary.inactive_pools_size,
660        |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
661    )?;
662
663    validator_exchange_rates(state, tables)
664}
665
666/// Check for validators in the `Pending` state and get its exchange rate. For
667/// these validators, their exchange rates should not be cached as their state
668/// can occur during an epoch or across multiple ones. In contrast, exchange
669/// rates for `Active` and `Inactive` validators can be cached, as their state
670/// changes only at epoch change.
671fn pending_validators_exchange_rate(
672    state: &Arc<dyn StateRead>,
673) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
674    let system_state = state.get_system_state()?;
675    let object_store = state.get_object_store();
676
677    // Try to find for any pending active validator
678    let tables = system_state
679        .get_pending_active_validators(object_store)?
680        .into_iter()
681        .map(|pending_active_validator| {
682            (
683                pending_active_validator.iota_address,
684                pending_active_validator.staking_pool_id,
685                pending_active_validator.exchange_rates_id,
686                pending_active_validator.exchange_rates_size,
687                false,
688            )
689        })
690        .collect::<Vec<ValidatorTable>>();
691
692    validator_exchange_rates(state, tables)
693}
694
695/// Check for validators in the `Candidate` state and get its exchange rate. For
696/// these validators, their exchange rates should not be cached as their state
697/// can occur during an epoch or across multiple ones. In contrast, exchange
698/// rates for `Active` and `Inactive` validators can be cached, as their state
699/// changes only at epoch change.
700fn candidate_validators_exchange_rate(
701    state: &Arc<dyn StateRead>,
702) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
703    let system_state_summary = IotaSystemStateSummaryV2::try_from(
704        state.get_system_state()?.into_iota_system_state_summary(),
705    )?;
706
707    // From validator_candidates_id table get validator info using as key its
708    // IotaAddress
709    let tables = validator_summary_from_system_state(
710        state,
711        system_state_summary.validator_candidates_id,
712        system_state_summary.validator_candidates_size,
713        |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
714    )?;
715
716    validator_exchange_rates(state, tables)
717}
718
719/// Fetches validator status information from `StateRead`.
720///
721/// This makes sense for validators not included in `IotaSystemStateSummary`.
722/// `IotaSystemStateSummary` only contains information about `Active`
723/// validators. To retrieve information about `Inactive`, `Candidate`, and
724/// `Pending` validators, we need to access dynamic fields within specific
725/// Move tables.
726///
727/// To retrieve validator status information, this function utilizes the
728/// corresponding `table_id` (an `ObjectID` value) and a `limit` to specify the
729/// number of records to fetch. Both the `table_id` and `limit` can be obtained
730/// from `IotaSystemStateSummary` in the caller. Additionally, keys are
731/// extracted from the table `DynamicFieldInfo` values according to the `key`
732/// closure. This helps in identifying the specific validator within the table.
733///
734/// # Example
735///
736/// ```text
737/// // Get inactive validators
738/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
739/// let _ = validator_summary_from_system_state(
740///     state,
741///     // ID of the object that maps from a staking pool ID to the inactive validator that has that pool as its staking pool.
742///     system_state_summary.inactive_pools_id,
743///     // Number of inactive staking pools.
744///     system_state_summary.inactive_pools_size,
745///     // Extract the `ID` of the `Inactive` validator from the `DynamicFieldInfo` in the `system_state_summary.inactive_pools_id` table
746///     |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
747/// ).unwrap();
748/// ```
749///
750/// # Example
751///
752/// ```text
753/// // Get candidate validators
754/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
755/// let _ = validator_summary_from_system_state(
756///     state,
757///     // ID of the object that stores preactive validators, mapping their addresses to their Validator structs
758///     system_state_summary.validator_candidates_id,
759///     // Number of preactive validators
760///     system_state_summary.validator_candidates_size,
761///     // Extract the `IotaAddress` of the `Candidate` validator from the `DynamicFieldInfo` in the `system_state_summary.validator_candidates_id` table
762///     |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
763/// ).unwrap();
764/// ```
765fn validator_summary_from_system_state<K, F>(
766    state: &Arc<dyn StateRead>,
767    table_id: ObjectID,
768    limit: u64,
769    key: F,
770) -> RpcInterimResult<Vec<ValidatorTable>>
771where
772    F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
773    K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
774{
775    let object_store = state.get_object_store();
776
777    state
778        .get_dynamic_fields(table_id, None, limit as usize)?
779        .into_iter()
780        .map(|(_object_id, df)| {
781            let validator_summary = get_validator_from_table(object_store, table_id, &key(df)?)?;
782
783            Ok((
784                validator_summary.iota_address,
785                validator_summary.staking_pool_id,
786                validator_summary.exchange_rates_id,
787                validator_summary.exchange_rates_size,
788                false,
789            ))
790        })
791        .collect()
792}
793
794#[derive(Clone, Debug)]
795pub struct ValidatorExchangeRates {
796    pub address: IotaAddress,
797    pub pool_id: ObjectID,
798    pub active: bool,
799    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
800}
801
802/// Backfill missing rates for some epochs due to safe mode. If a rate is
803/// missing for epoch e, we will use the rate for epoch e-1 to fill it. Rates
804/// returned are in descending order by epoch.
805fn backfill_rates(
806    mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
807) -> Vec<(EpochId, PoolTokenExchangeRate)> {
808    if rates.is_empty() {
809        return rates;
810    }
811    // ensure epochs are processed in increasing order
812    rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
813
814    // Check if there are any gaps in the epochs
815    let (min_epoch, _) = rates.first().expect("rates should not be empty");
816    let (max_epoch, _) = rates.last().expect("rates should not be empty");
817    let expected_len = (max_epoch - min_epoch + 1) as usize;
818    let current_len = rates.len();
819
820    // Only perform backfilling if there are gaps
821    if current_len == expected_len {
822        rates.reverse();
823        return rates;
824    }
825
826    let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
827    let mut missing_rates = Vec::with_capacity(expected_len - current_len);
828    for (epoch_id, rate) in rates {
829        // fill gaps between the last processed epoch and the current one
830        if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
831            for missing_epoch_id in prev_epoch_id + 1..epoch_id {
832                missing_rates.push((missing_epoch_id, prev_rate.clone()));
833            }
834        };
835
836        // append any missing_rates before adding the current epoch.
837        // if empty, nothing gets appended.
838        // if not empty, it will be empty afterwards because it was moved into
839        // filled_rates
840        filled_rates.append(&mut missing_rates);
841        filled_rates.push((epoch_id, rate));
842    }
843    filled_rates.reverse();
844    filled_rates
845}
846
847impl IotaRpcModule for GovernanceReadApi {
848    fn rpc(self) -> RpcModule<Self> {
849        self.into_rpc()
850    }
851
852    fn rpc_doc_module() -> Module {
853        GovernanceReadApiOpenRpc::module_doc()
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use iota_types::iota_system_state::PoolTokenExchangeRate;
860
861    use super::*;
862
863    #[test]
864    fn calculate_apys_with_outliers() {
865        let file =
866            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
867        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
868            serde_json::from_reader(file).unwrap();
869
870        let mut address_map = BTreeMap::new();
871
872        let exchange_rates = rates
873            .into_iter()
874            .map(|(validator, rates)| {
875                let address = IotaAddress::random_for_testing_only();
876                address_map.insert(address, validator);
877                ValidatorExchangeRates {
878                    address,
879                    pool_id: ObjectID::random(),
880                    active: true,
881                    rates,
882                }
883            })
884            .collect();
885
886        let apys = calculate_apys(exchange_rates);
887
888        for apy in &apys {
889            println!("{}: {}", address_map[&apy.address], apy.apy);
890            assert!(apy.apy < 0.25)
891        }
892    }
893
894    #[test]
895    fn test_backfill_rates_empty() {
896        let rates = vec![];
897        assert_eq!(backfill_rates(rates), vec![]);
898    }
899
900    #[test]
901    fn test_backfill_rates_no_gaps() {
902        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
903        let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
904        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
905        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
906
907        let expected: Vec<(u64, PoolTokenExchangeRate)> =
908            vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
909        assert_eq!(backfill_rates(rates), expected);
910    }
911
912    #[test]
913    fn test_backfill_single_rate() {
914        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
915        let rates = vec![(1, rate1.clone())];
916        let expected = vec![(1, rate1)];
917        assert_eq!(backfill_rates(rates), expected);
918    }
919
920    #[test]
921    fn test_backfill_rates_with_gaps() {
922        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
923        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
924        let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
925        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
926
927        let expected = vec![
928            (5, rate5.clone()),
929            (4, rate3.clone()),
930            (3, rate3.clone()),
931            (2, rate1.clone()),
932            (1, rate1),
933        ];
934        assert_eq!(backfill_rates(rates), expected);
935    }
936
937    #[test]
938    fn test_backfill_rates_missing_middle_epoch() {
939        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
940        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
941        let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
942        let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
943        assert_eq!(backfill_rates(rates), expected);
944    }
945
946    #[test]
947    fn test_backfill_rates_missing_middle_epochs() {
948        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
949        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
950        let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
951        let expected = vec![
952            (4, rate4),
953            (3, rate1.clone()),
954            (2, rate1.clone()),
955            (1, rate1),
956        ];
957        assert_eq!(backfill_rates(rates), expected);
958    }
959
960    #[test]
961    fn test_backfill_rates_unordered_input() {
962        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
963        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
964        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
965        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
966        let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
967        assert_eq!(backfill_rates(rates), expected);
968    }
969}