iota_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11    GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14    DelegatedStake, DelegatedTimelockedStake, IotaCommittee, Stake, StakeStatus, TimelockedStake,
15    ValidatorApy, ValidatorApys,
16};
17use iota_metrics::spawn_monitored_task;
18use iota_open_rpc::Module;
19use iota_types::{
20    MoveTypeTagTrait,
21    base_types::{IotaAddress, ObjectID},
22    committee::EpochId,
23    dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
24    error::{IotaError, UserInputError},
25    governance::StakedIota,
26    id::ID,
27    iota_serde::BigInt,
28    iota_system_state::{
29        IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
30        iota_system_state_summary::{
31            IotaSystemStateSummary, IotaSystemStateSummaryV1, IotaSystemStateSummaryV2,
32        },
33    },
34    object::{Object, ObjectRead},
35    timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44    IotaRpcModule, ObjectProvider,
45    authority_state::StateRead,
46    error::{Error, IotaRpcInputError, RpcInterimResult},
47    logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54    state: Arc<dyn StateRead>,
55    pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60        Self { state, metrics }
61    }
62
63    async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64        let state = self.state.clone();
65        let result =
66            spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68        self.metrics
69            .get_stake_iota_result_size
70            .observe(result.len() as f64);
71        self.metrics
72            .get_stake_iota_result_size_total
73            .inc_by(result.len() as u64);
74        Ok(result)
75    }
76
77    async fn get_timelocked_staked_iota(
78        &self,
79        owner: IotaAddress,
80    ) -> Result<Vec<TimelockedStakedIota>, Error> {
81        let state = self.state.clone();
82        let result =
83            spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84                .await??;
85
86        self.metrics
87            .get_stake_iota_result_size
88            .observe(result.len() as f64);
89        self.metrics
90            .get_stake_iota_result_size_total
91            .inc_by(result.len() as u64);
92        Ok(result)
93    }
94
95    async fn get_stakes_by_ids(
96        &self,
97        staked_iota_ids: Vec<ObjectID>,
98    ) -> Result<Vec<DelegatedStake>, Error> {
99        let state = self.state.clone();
100        let stakes_read = spawn_monitored_task!(async move {
101            staked_iota_ids
102                .iter()
103                .map(|id| state.get_object_read(id))
104                .collect::<Result<Vec<_>, _>>()
105        })
106        .await??;
107
108        if stakes_read.is_empty() {
109            return Ok(vec![]);
110        }
111
112        let stakes: Vec<(StakedIota, bool)> = self
113            .stakes_with_status(stakes_read.into_iter())
114            .await?
115            .into_iter()
116            .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117            .collect::<Result<_, _>>()?;
118
119        self.get_delegated_stakes(stakes).await
120    }
121
122    async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123        let timer = self.metrics.get_stake_iota_latency.start_timer();
124        let stakes = self.get_staked_iota(owner).await?;
125        if stakes.is_empty() {
126            return Ok(vec![]);
127        }
128        drop(timer);
129
130        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132        let self_clone = self.clone();
133        spawn_monitored_task!(
134            self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135        )
136        .await?
137    }
138
139    async fn get_timelocked_stakes_by_ids(
140        &self,
141        timelocked_staked_iota_ids: Vec<ObjectID>,
142    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143        let state = self.state.clone();
144        let stakes_read = spawn_monitored_task!(async move {
145            timelocked_staked_iota_ids
146                .iter()
147                .map(|id| state.get_object_read(id))
148                .collect::<Result<Vec<_>, _>>()
149        })
150        .await??;
151
152        if stakes_read.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let stakes: Vec<(TimelockedStakedIota, bool)> = self
157            .stakes_with_status(stakes_read.into_iter())
158            .await?
159            .into_iter()
160            .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161            .collect::<Result<_, _>>()?;
162
163        self.get_delegated_timelocked_stakes(stakes).await
164    }
165
166    async fn get_timelocked_stakes(
167        &self,
168        owner: IotaAddress,
169    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170        let timer = self.metrics.get_stake_iota_latency.start_timer();
171        let stakes = self.get_timelocked_staked_iota(owner).await?;
172        if stakes.is_empty() {
173            return Ok(vec![]);
174        }
175        drop(timer);
176
177        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179        let self_clone = self.clone();
180        spawn_monitored_task!(
181            self_clone
182                .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183        )
184        .await?
185    }
186
187    async fn get_delegated_stakes(
188        &self,
189        stakes: Vec<(StakedIota, bool)>,
190    ) -> Result<Vec<DelegatedStake>, Error> {
191        let pools = stakes.into_iter().fold(
192            BTreeMap::<_, Vec<_>>::new(),
193            |mut pools, (stake, exists)| {
194                pools
195                    .entry(stake.pool_id())
196                    .or_default()
197                    .push((stake, exists));
198                pools
199            },
200        );
201
202        let system_state = self.get_system_state()?;
203        let system_state_summary = IotaSystemStateSummaryV2::try_from(
204            system_state.clone().into_iota_system_state_summary(),
205        )?;
206
207        let rates = exchange_rates(&self.state, system_state_summary.epoch)
208            .await?
209            .into_iter()
210            // Try to find for any candidate validator exchange rate
211            .chain(candidate_validators_exchange_rate(&self.state)?.into_iter())
212            // Try to find for any pending validator exchange rate
213            .chain(pending_validators_exchange_rate(&self.state)?.into_iter())
214            .map(|rates| (rates.pool_id, rates))
215            .collect::<BTreeMap<_, _>>();
216
217        let mut delegated_stakes = vec![];
218        for (pool_id, stakes) in pools {
219            // Rate table and rate can be null when the pool is not active
220            let rate_table = rates.get(&pool_id).ok_or_else(|| {
221                IotaRpcInputError::GenericNotFound(format!(
222                    "Cannot find rates for staking pool {pool_id}"
223                ))
224            })?;
225            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227            let mut delegations = vec![];
228            for (stake, exists) in stakes {
229                let status = stake_status(
230                    system_state_summary.epoch,
231                    stake.activation_epoch(),
232                    stake.principal(),
233                    exists,
234                    current_rate,
235                    rate_table,
236                );
237                delegations.push(Stake {
238                    staked_iota_id: stake.id(),
239                    // TODO: this might change when we implement warm up period.
240                    stake_request_epoch: stake.activation_epoch() - 1,
241                    stake_active_epoch: stake.activation_epoch(),
242                    principal: stake.principal(),
243                    status,
244                })
245            }
246            delegated_stakes.push(DelegatedStake {
247                validator_address: rate_table.address,
248                staking_pool: pool_id,
249                stakes: delegations,
250            })
251        }
252        Ok(delegated_stakes)
253    }
254
255    async fn get_delegated_timelocked_stakes(
256        &self,
257        stakes: Vec<(TimelockedStakedIota, bool)>,
258    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259        let pools = stakes.into_iter().fold(
260            BTreeMap::<_, Vec<_>>::new(),
261            |mut pools, (stake, exists)| {
262                pools
263                    .entry(stake.pool_id())
264                    .or_default()
265                    .push((stake, exists));
266                pools
267            },
268        );
269
270        let system_state = self.get_system_state()?;
271        let system_state_summary = IotaSystemStateSummaryV2::try_from(
272            system_state.clone().into_iota_system_state_summary(),
273        )?;
274
275        let rates = exchange_rates(&self.state, system_state_summary.epoch)
276            .await?
277            .into_iter()
278            // Try to find for any candidate validator exchange rate
279            .chain(candidate_validators_exchange_rate(&self.state)?)
280            // Try to find for any pending validator exchange rate
281            .chain(pending_validators_exchange_rate(&self.state)?)
282            .map(|rates| (rates.pool_id, rates))
283            .collect::<BTreeMap<_, _>>();
284
285        let mut delegated_stakes = vec![];
286        for (pool_id, stakes) in pools {
287            // Rate table and rate can be null when the pool is not active
288            let rate_table = rates.get(&pool_id).ok_or_else(|| {
289                IotaRpcInputError::GenericNotFound(format!(
290                    "Cannot find rates for staking pool {pool_id}"
291                ))
292            })?;
293            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295            let mut delegations = vec![];
296            for (stake, exists) in stakes {
297                let status = stake_status(
298                    system_state_summary.epoch,
299                    stake.activation_epoch(),
300                    stake.principal(),
301                    exists,
302                    current_rate,
303                    rate_table,
304                );
305                delegations.push(TimelockedStake {
306                    timelocked_staked_iota_id: stake.id(),
307                    // TODO: this might change when we implement warm up period.
308                    stake_request_epoch: stake.activation_epoch() - 1,
309                    stake_active_epoch: stake.activation_epoch(),
310                    principal: stake.principal(),
311                    status,
312                    expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313                    label: stake.label().clone(),
314                })
315            }
316            delegated_stakes.push(DelegatedTimelockedStake {
317                validator_address: rate_table.address,
318                staking_pool: pool_id,
319                stakes: delegations,
320            })
321        }
322        Ok(delegated_stakes)
323    }
324
325    async fn stakes_with_status(
326        &self,
327        iter: impl Iterator<Item = ObjectRead>,
328    ) -> Result<Vec<(Object, bool)>, Error> {
329        let mut stakes = vec![];
330
331        for stake in iter {
332            match stake {
333                ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334                ObjectRead::Deleted((object_id, version, _)) => {
335                    let Some(o) = self
336                        .state
337                        .find_object_lt_or_eq_version(&object_id, &version.one_before().unwrap())
338                        .await?
339                    else {
340                        Err(IotaRpcInputError::UserInput(
341                            UserInputError::ObjectNotFound {
342                                object_id,
343                                version: None,
344                            },
345                        ))?
346                    };
347                    stakes.push((o, false));
348                }
349                ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
350                    UserInputError::ObjectNotFound {
351                        object_id: id,
352                        version: None,
353                    },
354                ))?,
355            }
356        }
357
358        Ok(stakes)
359    }
360
361    fn get_system_state(&self) -> Result<IotaSystemState, Error> {
362        Ok(self.state.get_system_state()?)
363    }
364}
365
366#[async_trait]
367impl GovernanceReadApiServer for GovernanceReadApi {
368    #[instrument(skip(self))]
369    async fn get_stakes_by_ids(
370        &self,
371        staked_iota_ids: Vec<ObjectID>,
372    ) -> RpcResult<Vec<DelegatedStake>> {
373        self.get_stakes_by_ids(staked_iota_ids).trace().await
374    }
375
376    #[instrument(skip(self))]
377    async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
378        self.get_stakes(owner).trace().await
379    }
380
381    #[instrument(skip(self))]
382    async fn get_timelocked_stakes_by_ids(
383        &self,
384        timelocked_staked_iota_ids: Vec<ObjectID>,
385    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
386        self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
387            .trace()
388            .await
389    }
390
391    #[instrument(skip(self))]
392    async fn get_timelocked_stakes(
393        &self,
394        owner: IotaAddress,
395    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
396        self.get_timelocked_stakes(owner).trace().await
397    }
398
399    #[instrument(skip(self))]
400    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
401        async move {
402            self.state
403                .get_or_latest_committee(epoch)
404                .map(|committee| committee.into())
405                .map_err(Error::from)
406        }
407        .trace()
408        .await
409    }
410
411    #[instrument(skip(self))]
412    async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummary> {
413        async move {
414            Ok(self
415                .state
416                .get_system_state()?
417                .into_iota_system_state_summary())
418        }
419        .trace()
420        .await
421    }
422
423    #[instrument(skip(self))]
424    async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1> {
425        async move {
426            Ok(self
427                .state
428                .get_system_state()?
429                .into_iota_system_state_summary()
430                .try_into()?)
431        }
432        .trace()
433        .await
434    }
435
436    #[instrument(skip(self))]
437    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
438        async move {
439            let epoch_store = self.state.load_epoch_store_one_call_per_task();
440            Ok(epoch_store.reference_gas_price().into())
441        }
442        .trace()
443        .await
444    }
445
446    #[instrument(skip(self))]
447    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
448        info!("get_validator_apy");
449        let system_state_summary = self.get_latest_iota_system_state().await?;
450
451        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
452            .await
453            .map_err(|e| error_object_from_rpc(e.into()))?;
454
455        let apys = calculate_apys(exchange_rate_table);
456
457        Ok(ValidatorApys {
458            apys,
459            epoch: system_state_summary.epoch,
460        })
461    }
462}
463
464pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
465    let mut apys = vec![];
466
467    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
468        let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
469
470        let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
471        apys.push(ValidatorApy {
472            address: rates.address,
473            apy: mean_apy,
474        });
475    }
476    apys
477}
478
479/// Calculate the APY using a 7-epoch moving average.
480///
481/// Returns the Mean by default, but falls back to the Median if outliers are
482/// detected. Outliers are defined as any APY > `MAX_VALID_APY` (100%) or if the
483/// trailing 8th epoch exchange rate is missing. This fallback protects against
484/// skewed results caused by large staking events or the spikes seen after
485/// missing exchange rates.
486pub fn mean_apy_from_exchange_rates<'er>(
487    exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
488) -> f64 {
489    // We set this value after observing the APY of validators in mainnet.
490    const MAX_VALID_APY: f64 = 1.00;
491    const SAMPLES: usize = 7;
492
493    let rates = exchange_rates.clone().dropping(1);
494    let rates_next = exchange_rates.dropping_back(1);
495
496    let mut apys = rates
497        .zip(rates_next)
498        .take(SAMPLES + 1)
499        .map(|(er, er_next)| calculate_apy(er, er_next))
500        .collect::<Vec<_>>();
501
502    // Return 0.0 if there is no data OR if any APY is negative
503    if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
504        return 0.0;
505    }
506    // If any single epoch has outliers (that is APY > MAX_VALID_APY or exchange
507    // rate for epoch e-8 is missing), we switch to Median. Otherwise, we use
508    // the standard Mean.
509    let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
510        || apys.iter().any(|&apy| apy > MAX_VALID_APY);
511
512    apys.truncate(SAMPLES);
513
514    if has_outlier {
515        Data::new(apys).median()
516    } else {
517        let sum: f64 = apys.iter().sum();
518        sum / SAMPLES as f64
519    }
520}
521
522/// Calculate the APY by the exchange rate of two consecutive epochs
523/// (`er`, `er_next`).
524///
525/// The formula used is `APY_e = (er.rate - er_next.rate) / er.rate * 365`
526fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
527    ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
528}
529
530fn stake_status(
531    epoch: u64,
532    activation_epoch: u64,
533    principal: u64,
534    exists: bool,
535    current_rate: Option<&PoolTokenExchangeRate>,
536    rate_table: &ValidatorExchangeRates,
537) -> StakeStatus {
538    if !exists {
539        StakeStatus::Unstaked
540    } else if epoch >= activation_epoch {
541        // TODO: use dev_inspect to call a move function to get the estimated reward
542        let estimated_reward = if let Some(current_rate) = current_rate {
543            let stake_rate = rate_table
544                .rates
545                .iter()
546                .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
547                .unwrap_or_default();
548            let estimated_reward =
549                ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
550            max(0, estimated_reward.round() as u64)
551        } else {
552            0
553        };
554        StakeStatus::Active { estimated_reward }
555    } else {
556        StakeStatus::Pending
557    }
558}
559
560/// Cached exchange rates for validators for the given epoch, the cache size is
561/// 1, it will be cleared when the epoch changes. rates are in descending order
562/// by epoch.
563#[cached(
564    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
565    create = "{ SizedCache::with_size(1) }",
566    convert = "{ _current_epoch }",
567    result = true
568)]
569async fn exchange_rates(
570    state: &Arc<dyn StateRead>,
571    _current_epoch: EpochId,
572) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
573    Ok(active_validators_exchange_rates(state)?
574        .into_iter()
575        .chain(inactive_validators_exchange_rates(state)?.into_iter())
576        .collect())
577}
578
579// `cached` keeps results by the input key -- `current_epoch`.
580// `exchange_rates` is not a pure function, has effects via `state`
581// which `cached` is not aware of.
582// In normal node operation this does not create issues.
583// In tests that run several different networks the latter calls
584// will get incorrect/outdated cached results.
585// This function allows to clear `cached` cache for `exchange_rates`.
586#[cfg(msim)]
587pub async fn clear_exchange_rates_cache_for_testing() {
588    use cached::Cached;
589    if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
590        let mut cache = mutex.lock().await;
591        cache.cache_clear();
592    }
593}
594
595/// Get validator exchange rates
596fn validator_exchange_rates(
597    state: &Arc<dyn StateRead>,
598    tables: Vec<ValidatorTable>,
599) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
600    if tables.is_empty() {
601        return Ok(vec![]);
602    };
603
604    let mut exchange_rates = vec![];
605    // Get exchange rates for each validator
606    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
607        let mut rates = state
608            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
609            .into_iter()
610            .map(|(_object_id, df)| {
611                let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
612                    IotaError::ObjectDeserialization {
613                        error: e.to_string(),
614                    }
615                })?;
616
617                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
618                    &state.get_object_store().as_ref(),
619                    exchange_rates_id,
620                    &epoch,
621                )?;
622
623                Ok::<_, IotaError>((epoch, exchange_rate))
624            })
625            .collect::<Result<Vec<_>, _>>()?;
626
627        // Rates for some epochs might be missing due to safe mode, we need to backfill
628        // them.
629        rates = backfill_rates(rates);
630
631        exchange_rates.push(ValidatorExchangeRates {
632            address,
633            pool_id,
634            active,
635            rates,
636        });
637    }
638
639    Ok(exchange_rates)
640}
641
642/// Check for validators in the `Active` state and get its exchange rate
643fn active_validators_exchange_rates(
644    state: &Arc<dyn StateRead>,
645) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
646    let system_state_summary = IotaSystemStateSummaryV2::try_from(
647        state.get_system_state()?.into_iota_system_state_summary(),
648    )?;
649
650    let tables = system_state_summary
651        .active_validators
652        .into_iter()
653        .map(|validator| {
654            (
655                validator.iota_address,
656                validator.staking_pool_id,
657                validator.exchange_rates_id,
658                validator.exchange_rates_size,
659                true,
660            )
661        })
662        .collect();
663
664    validator_exchange_rates(state, tables)
665}
666
667/// Check for validators in the `Inactive` state and get its exchange rate
668fn inactive_validators_exchange_rates(
669    state: &Arc<dyn StateRead>,
670) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
671    let system_state_summary = IotaSystemStateSummaryV2::try_from(
672        state.get_system_state()?.into_iota_system_state_summary(),
673    )?;
674
675    let tables = validator_summary_from_system_state(
676        state,
677        system_state_summary.inactive_pools_id,
678        system_state_summary.inactive_pools_size,
679        |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
680    )?;
681
682    validator_exchange_rates(state, tables)
683}
684
685/// Check for validators in the `Pending` state and get its exchange rate. For
686/// these validators, their exchange rates should not be cached as their state
687/// can occur during an epoch or across multiple ones. In contrast, exchange
688/// rates for `Active` and `Inactive` validators can be cached, as their state
689/// changes only at epoch change.
690fn pending_validators_exchange_rate(
691    state: &Arc<dyn StateRead>,
692) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
693    let system_state = state.get_system_state()?;
694    let object_store = state.get_object_store();
695
696    // Try to find for any pending active validator
697    let tables = system_state
698        .get_pending_active_validators(object_store)?
699        .into_iter()
700        .map(|pending_active_validator| {
701            (
702                pending_active_validator.iota_address,
703                pending_active_validator.staking_pool_id,
704                pending_active_validator.exchange_rates_id,
705                pending_active_validator.exchange_rates_size,
706                false,
707            )
708        })
709        .collect::<Vec<ValidatorTable>>();
710
711    validator_exchange_rates(state, tables)
712}
713
714/// Check for validators in the `Candidate` state and get its exchange rate. For
715/// these validators, their exchange rates should not be cached as their state
716/// can occur during an epoch or across multiple ones. In contrast, exchange
717/// rates for `Active` and `Inactive` validators can be cached, as their state
718/// changes only at epoch change.
719fn candidate_validators_exchange_rate(
720    state: &Arc<dyn StateRead>,
721) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
722    let system_state_summary = IotaSystemStateSummaryV2::try_from(
723        state.get_system_state()?.into_iota_system_state_summary(),
724    )?;
725
726    // From validator_candidates_id table get validator info using as key its
727    // IotaAddress
728    let tables = validator_summary_from_system_state(
729        state,
730        system_state_summary.validator_candidates_id,
731        system_state_summary.validator_candidates_size,
732        |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
733    )?;
734
735    validator_exchange_rates(state, tables)
736}
737
738/// Fetches validator status information from `StateRead`.
739///
740/// This makes sense for validators not included in `IotaSystemStateSummary`.
741/// `IotaSystemStateSummary` only contains information about `Active`
742/// validators. To retrieve information about `Inactive`, `Candidate`, and
743/// `Pending` validators, we need to access dynamic fields within specific
744/// Move tables.
745///
746/// To retrieve validator status information, this function utilizes the
747/// corresponding `table_id` (an `ObjectID` value) and a `limit` to specify the
748/// number of records to fetch. Both the `table_id` and `limit` can be obtained
749/// from `IotaSystemStateSummary` in the caller. Additionally, keys are
750/// extracted from the table `DynamicFieldInfo` values according to the `key`
751/// closure. This helps in identifying the specific validator within the table.
752///
753/// # Example
754///
755/// ```text
756/// // Get inactive validators
757/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
758/// let _ = validator_summary_from_system_state(
759///     state,
760///     // ID of the object that maps from a staking pool ID to the inactive validator that has that pool as its staking pool.
761///     system_state_summary.inactive_pools_id,
762///     // Number of inactive staking pools.
763///     system_state_summary.inactive_pools_size,
764///     // Extract the `ID` of the `Inactive` validator from the `DynamicFieldInfo` in the `system_state_summary.inactive_pools_id` table
765///     |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
766/// ).unwrap();
767/// ```
768///
769/// # Example
770///
771/// ```text
772/// // Get candidate validators
773/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
774/// let _ = validator_summary_from_system_state(
775///     state,
776///     // ID of the object that stores preactive validators, mapping their addresses to their Validator structs
777///     system_state_summary.validator_candidates_id,
778///     // Number of preactive validators
779///     system_state_summary.validator_candidates_size,
780///     // Extract the `IotaAddress` of the `Candidate` validator from the `DynamicFieldInfo` in the `system_state_summary.validator_candidates_id` table
781///     |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
782/// ).unwrap();
783/// ```
784fn validator_summary_from_system_state<K, F>(
785    state: &Arc<dyn StateRead>,
786    table_id: ObjectID,
787    limit: u64,
788    key: F,
789) -> RpcInterimResult<Vec<ValidatorTable>>
790where
791    F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
792    K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
793{
794    let object_store = state.get_object_store();
795
796    state
797        .get_dynamic_fields(table_id, None, limit as usize)?
798        .into_iter()
799        .map(|(_object_id, df)| {
800            let validator_summary = get_validator_from_table(object_store, table_id, &key(df)?)?;
801
802            Ok((
803                validator_summary.iota_address,
804                validator_summary.staking_pool_id,
805                validator_summary.exchange_rates_id,
806                validator_summary.exchange_rates_size,
807                false,
808            ))
809        })
810        .collect()
811}
812
813#[derive(Clone, Debug)]
814pub struct ValidatorExchangeRates {
815    pub address: IotaAddress,
816    pub pool_id: ObjectID,
817    pub active: bool,
818    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
819}
820
821/// Backfill missing rates for some epochs due to safe mode. If a rate is
822/// missing for epoch e, we will use the rate for epoch e-1 to fill it. Rates
823/// returned are in descending order by epoch.
824fn backfill_rates(
825    mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
826) -> Vec<(EpochId, PoolTokenExchangeRate)> {
827    if rates.is_empty() {
828        return rates;
829    }
830    // ensure epochs are processed in increasing order
831    rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
832
833    // Check if there are any gaps in the epochs
834    let (min_epoch, _) = rates.first().expect("rates should not be empty");
835    let (max_epoch, _) = rates.last().expect("rates should not be empty");
836    let expected_len = (max_epoch - min_epoch + 1) as usize;
837    let current_len = rates.len();
838
839    // Only perform backfilling if there are gaps
840    if current_len == expected_len {
841        rates.reverse();
842        return rates;
843    }
844
845    let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
846    let mut missing_rates = Vec::with_capacity(expected_len - current_len);
847    for (epoch_id, rate) in rates {
848        // fill gaps between the last processed epoch and the current one
849        if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
850            for missing_epoch_id in prev_epoch_id + 1..epoch_id {
851                missing_rates.push((missing_epoch_id, prev_rate.clone()));
852            }
853        };
854
855        // append any missing_rates before adding the current epoch.
856        // if empty, nothing gets appended.
857        // if not empty, it will be empty afterwards because it was moved into
858        // filled_rates
859        filled_rates.append(&mut missing_rates);
860        filled_rates.push((epoch_id, rate));
861    }
862    filled_rates.reverse();
863    filled_rates
864}
865
866impl IotaRpcModule for GovernanceReadApi {
867    fn rpc(self) -> RpcModule<Self> {
868        self.into_rpc()
869    }
870
871    fn rpc_doc_module() -> Module {
872        GovernanceReadApiOpenRpc::module_doc()
873    }
874}
875
876#[cfg(test)]
877mod tests {
878    use iota_types::iota_system_state::PoolTokenExchangeRate;
879
880    use super::*;
881
882    #[test]
883    fn calculate_apys_with_outliers() {
884        let file =
885            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
886                .unwrap();
887        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
888            serde_json::from_reader(file).unwrap();
889
890        let mut address_map = BTreeMap::new();
891
892        let exchange_rates = rates
893            .into_iter()
894            .map(|(validator, rates_vec)| {
895                let address = IotaAddress::random_for_testing_only();
896                address_map.insert(address, validator);
897                ValidatorExchangeRates {
898                    address,
899                    pool_id: ObjectID::random(),
900                    active: true,
901                    rates: backfill_rates(rates_vec),
902                }
903            })
904            .collect();
905
906        let apys = calculate_apys(exchange_rates);
907
908        for apy in &apys {
909            println!("{}: {}", address_map[&apy.address], apy.apy);
910            assert!(apy.apy < 0.15)
911        }
912    }
913
914    #[test]
915    fn calculate_apys_without_outliers() {
916        let file =
917            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
918                .unwrap();
919        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
920            serde_json::from_reader(file).unwrap();
921
922        let mut address_map = BTreeMap::new();
923
924        let exchange_rates = rates
925            .into_iter()
926            .map(|(validator, rates_vec)| {
927                let address = IotaAddress::random_for_testing_only();
928                address_map.insert(address, validator);
929                ValidatorExchangeRates {
930                    address,
931                    pool_id: ObjectID::random(),
932                    active: true,
933                    rates: backfill_rates(rates_vec),
934                }
935            })
936            .collect();
937
938        let apys = calculate_apys(exchange_rates);
939
940        for apy in &apys {
941            println!("{}: {}", address_map[&apy.address], apy.apy);
942            assert!(apy.apy < 0.15)
943        }
944    }
945
946    #[test]
947    fn test_backfill_rates_empty() {
948        let rates = vec![];
949        assert_eq!(backfill_rates(rates), vec![]);
950    }
951
952    #[test]
953    fn test_backfill_rates_no_gaps() {
954        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
955        let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
956        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
957        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
958
959        let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
960        assert_eq!(backfill_rates(rates), expected);
961    }
962
963    #[test]
964    fn test_backfill_single_rate() {
965        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
966        let rates = vec![(1, rate1.clone())];
967        let expected = vec![(1, rate1)];
968        assert_eq!(backfill_rates(rates), expected);
969    }
970
971    #[test]
972    fn test_backfill_rates_with_gaps() {
973        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
974        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
975        let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
976        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
977
978        let expected = vec![
979            (5, rate5),
980            (4, rate3.clone()),
981            (3, rate3),
982            (2, rate1.clone()),
983            (1, rate1),
984        ];
985        assert_eq!(backfill_rates(rates), expected);
986    }
987
988    #[test]
989    fn test_backfill_rates_missing_middle_epoch() {
990        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
991        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
992        let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
993        let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
994        assert_eq!(backfill_rates(rates), expected);
995    }
996
997    #[test]
998    fn test_backfill_rates_missing_middle_epochs() {
999        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1000        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1001        let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1002        let expected = vec![
1003            (4, rate4),
1004            (3, rate1.clone()),
1005            (2, rate1.clone()),
1006            (1, rate1),
1007        ];
1008        assert_eq!(backfill_rates(rates), expected);
1009    }
1010
1011    #[test]
1012    fn test_backfill_rates_unordered_input() {
1013        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1014        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1015        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1016        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1017        let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1018        assert_eq!(backfill_rates(rates), expected);
1019    }
1020}