Skip to main content

iota_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11    GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14    DelegatedStake, DelegatedTimelockedStake, IotaCommittee,
15    IotaSystemStateSummary as IotaSystemStateSummarySchema,
16    IotaSystemStateSummaryV1 as IotaSystemStateSummaryV1Schema, Stake, StakeStatus,
17    TimelockedStake, ValidatorApy, ValidatorApys,
18};
19use iota_metrics::spawn_monitored_task;
20use iota_open_rpc::Module;
21use iota_types::{
22    MoveTypeTagTrait,
23    base_types::{IotaAddress, ObjectID},
24    committee::EpochId,
25    dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
26    error::{IotaError, UserInputError},
27    governance::StakedIota,
28    id::ID,
29    iota_serde::BigInt,
30    iota_system_state::{
31        IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
32        iota_system_state_summary::{IotaSystemStateSummaryV1, IotaSystemStateSummaryV2},
33    },
34    object::{Object, ObjectRead},
35    timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44    IotaRpcModule, ObjectProvider,
45    authority_state::StateRead,
46    error::{Error, IotaRpcInputError, RpcInterimResult},
47    logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54    state: Arc<dyn StateRead>,
55    pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60        Self { state, metrics }
61    }
62
63    async fn get_staked_iota(&self, owner: IotaAddress) -> Result<Vec<StakedIota>, Error> {
64        let state = self.state.clone();
65        let result =
66            spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68        self.metrics
69            .get_stake_iota_result_size
70            .observe(result.len() as f64);
71        self.metrics
72            .get_stake_iota_result_size_total
73            .inc_by(result.len() as u64);
74        Ok(result)
75    }
76
77    async fn get_timelocked_staked_iota(
78        &self,
79        owner: IotaAddress,
80    ) -> Result<Vec<TimelockedStakedIota>, Error> {
81        let state = self.state.clone();
82        let result =
83            spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84                .await??;
85
86        self.metrics
87            .get_stake_iota_result_size
88            .observe(result.len() as f64);
89        self.metrics
90            .get_stake_iota_result_size_total
91            .inc_by(result.len() as u64);
92        Ok(result)
93    }
94
95    async fn get_stakes_by_ids(
96        &self,
97        staked_iota_ids: Vec<ObjectID>,
98    ) -> Result<Vec<DelegatedStake>, Error> {
99        let state = self.state.clone();
100        let stakes_read = spawn_monitored_task!(async move {
101            staked_iota_ids
102                .iter()
103                .map(|id| state.get_object_read(id))
104                .collect::<Result<Vec<_>, _>>()
105        })
106        .await??;
107
108        if stakes_read.is_empty() {
109            return Ok(vec![]);
110        }
111
112        let stakes: Vec<(StakedIota, bool)> = self
113            .stakes_with_status(stakes_read.into_iter())
114            .await?
115            .into_iter()
116            .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117            .collect::<Result<_, _>>()?;
118
119        self.get_delegated_stakes(stakes).await
120    }
121
122    async fn get_stakes(&self, owner: IotaAddress) -> Result<Vec<DelegatedStake>, Error> {
123        let timer = self.metrics.get_stake_iota_latency.start_timer();
124        let stakes = self.get_staked_iota(owner).await?;
125        if stakes.is_empty() {
126            return Ok(vec![]);
127        }
128        drop(timer);
129
130        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132        let self_clone = self.clone();
133        spawn_monitored_task!(
134            self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135        )
136        .await?
137    }
138
139    async fn get_timelocked_stakes_by_ids(
140        &self,
141        timelocked_staked_iota_ids: Vec<ObjectID>,
142    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143        let state = self.state.clone();
144        let stakes_read = spawn_monitored_task!(async move {
145            timelocked_staked_iota_ids
146                .iter()
147                .map(|id| state.get_object_read(id))
148                .collect::<Result<Vec<_>, _>>()
149        })
150        .await??;
151
152        if stakes_read.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let stakes: Vec<(TimelockedStakedIota, bool)> = self
157            .stakes_with_status(stakes_read.into_iter())
158            .await?
159            .into_iter()
160            .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161            .collect::<Result<_, _>>()?;
162
163        self.get_delegated_timelocked_stakes(stakes).await
164    }
165
166    async fn get_timelocked_stakes(
167        &self,
168        owner: IotaAddress,
169    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170        let timer = self.metrics.get_stake_iota_latency.start_timer();
171        let stakes = self.get_timelocked_staked_iota(owner).await?;
172        if stakes.is_empty() {
173            return Ok(vec![]);
174        }
175        drop(timer);
176
177        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179        let self_clone = self.clone();
180        spawn_monitored_task!(
181            self_clone
182                .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183        )
184        .await?
185    }
186
187    async fn get_delegated_stakes(
188        &self,
189        stakes: Vec<(StakedIota, bool)>,
190    ) -> Result<Vec<DelegatedStake>, Error> {
191        let pools = stakes.into_iter().fold(
192            BTreeMap::<_, Vec<_>>::new(),
193            |mut pools, (stake, exists)| {
194                pools
195                    .entry(stake.pool_id())
196                    .or_default()
197                    .push((stake, exists));
198                pools
199            },
200        );
201
202        let system_state = self.get_system_state()?;
203        let system_state_summary = IotaSystemStateSummaryV2::try_from(
204            system_state.clone().into_iota_system_state_summary(),
205        )?;
206
207        let rates = exchange_rates(&self.state, system_state_summary.epoch)
208            .await?
209            .into_iter()
210            // Try to find for any candidate validator exchange rate
211            .chain(candidate_validators_exchange_rate(&self.state)?)
212            // Try to find for any pending validator exchange rate
213            .chain(pending_validators_exchange_rate(&self.state)?)
214            .map(|rates| (rates.pool_id, rates))
215            .collect::<BTreeMap<_, _>>();
216
217        let mut delegated_stakes = vec![];
218        for (pool_id, stakes) in pools {
219            // Rate table and rate can be null when the pool is not active
220            let rate_table = rates.get(&pool_id).ok_or_else(|| {
221                IotaRpcInputError::GenericNotFound(format!(
222                    "Cannot find rates for staking pool {pool_id}"
223                ))
224            })?;
225            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227            let mut delegations = vec![];
228            for (stake, exists) in stakes {
229                let status = stake_status(
230                    system_state_summary.epoch,
231                    stake.activation_epoch(),
232                    stake.principal(),
233                    exists,
234                    current_rate,
235                    rate_table,
236                );
237                delegations.push(Stake {
238                    staked_iota_id: stake.id(),
239                    // TODO: this might change when we implement warm up period.
240                    stake_request_epoch: stake.activation_epoch() - 1,
241                    stake_active_epoch: stake.activation_epoch(),
242                    principal: stake.principal(),
243                    status,
244                })
245            }
246            delegated_stakes.push(DelegatedStake {
247                validator_address: rate_table.address,
248                staking_pool: pool_id,
249                stakes: delegations,
250            })
251        }
252        Ok(delegated_stakes)
253    }
254
255    async fn get_delegated_timelocked_stakes(
256        &self,
257        stakes: Vec<(TimelockedStakedIota, bool)>,
258    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259        let pools = stakes.into_iter().fold(
260            BTreeMap::<_, Vec<_>>::new(),
261            |mut pools, (stake, exists)| {
262                pools
263                    .entry(stake.pool_id())
264                    .or_default()
265                    .push((stake, exists));
266                pools
267            },
268        );
269
270        let system_state = self.get_system_state()?;
271        let system_state_summary = IotaSystemStateSummaryV2::try_from(
272            system_state.clone().into_iota_system_state_summary(),
273        )?;
274
275        let rates = exchange_rates(&self.state, system_state_summary.epoch)
276            .await?
277            .into_iter()
278            // Try to find for any candidate validator exchange rate
279            .chain(candidate_validators_exchange_rate(&self.state)?)
280            // Try to find for any pending validator exchange rate
281            .chain(pending_validators_exchange_rate(&self.state)?)
282            .map(|rates| (rates.pool_id, rates))
283            .collect::<BTreeMap<_, _>>();
284
285        let mut delegated_stakes = vec![];
286        for (pool_id, stakes) in pools {
287            // Rate table and rate can be null when the pool is not active
288            let rate_table = rates.get(&pool_id).ok_or_else(|| {
289                IotaRpcInputError::GenericNotFound(format!(
290                    "Cannot find rates for staking pool {pool_id}"
291                ))
292            })?;
293            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295            let mut delegations = vec![];
296            for (stake, exists) in stakes {
297                let status = stake_status(
298                    system_state_summary.epoch,
299                    stake.activation_epoch(),
300                    stake.principal(),
301                    exists,
302                    current_rate,
303                    rate_table,
304                );
305                delegations.push(TimelockedStake {
306                    timelocked_staked_iota_id: stake.id(),
307                    // TODO: this might change when we implement warm up period.
308                    stake_request_epoch: stake.activation_epoch() - 1,
309                    stake_active_epoch: stake.activation_epoch(),
310                    principal: stake.principal(),
311                    status,
312                    expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313                    label: stake.label().clone(),
314                })
315            }
316            delegated_stakes.push(DelegatedTimelockedStake {
317                validator_address: rate_table.address,
318                staking_pool: pool_id,
319                stakes: delegations,
320            })
321        }
322        Ok(delegated_stakes)
323    }
324
325    async fn stakes_with_status(
326        &self,
327        iter: impl Iterator<Item = ObjectRead>,
328    ) -> Result<Vec<(Object, bool)>, Error> {
329        let mut stakes = vec![];
330
331        for stake in iter {
332            match stake {
333                ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334                ObjectRead::Deleted(object_ref) => {
335                    let Some(o) = self
336                        .state
337                        .find_object_lt_or_eq_version(
338                            &object_ref.object_id,
339                            &object_ref.version.previous().unwrap(),
340                        )
341                        .await?
342                    else {
343                        Err(IotaRpcInputError::UserInput(
344                            UserInputError::ObjectNotFound {
345                                object_id: object_ref.object_id,
346                                version: None,
347                            },
348                        ))?
349                    };
350                    stakes.push((o, false));
351                }
352                ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
353                    UserInputError::ObjectNotFound {
354                        object_id: id,
355                        version: None,
356                    },
357                ))?,
358            }
359        }
360
361        Ok(stakes)
362    }
363
364    fn get_system_state(&self) -> Result<IotaSystemState, Error> {
365        Ok(self.state.get_system_state()?)
366    }
367}
368
369#[async_trait]
370impl GovernanceReadApiServer for GovernanceReadApi {
371    #[instrument(skip(self, staked_iota_ids), fields(staked_iota_ids = staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
372    async fn get_stakes_by_ids(
373        &self,
374        staked_iota_ids: Vec<ObjectID>,
375    ) -> RpcResult<Vec<DelegatedStake>> {
376        self.get_stakes_by_ids(staked_iota_ids).trace().await
377    }
378
379    #[instrument(skip(self, owner), fields(owner = %owner))]
380    async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
381        self.get_stakes(owner).trace().await
382    }
383
384    #[instrument(skip(self, timelocked_staked_iota_ids), fields(timelocked_staked_iota_ids = timelocked_staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
385    async fn get_timelocked_stakes_by_ids(
386        &self,
387        timelocked_staked_iota_ids: Vec<ObjectID>,
388    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
389        self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
390            .trace()
391            .await
392    }
393
394    #[instrument(skip(self, owner), fields(owner = %owner))]
395    async fn get_timelocked_stakes(
396        &self,
397        owner: IotaAddress,
398    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
399        self.get_timelocked_stakes(owner).trace().await
400    }
401
402    #[instrument(skip(self))]
403    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
404        async move {
405            self.state
406                .get_or_latest_committee(epoch)
407                .map(|committee| committee.into())
408                .map_err(Error::from)
409        }
410        .trace()
411        .await
412    }
413
414    #[instrument(skip(self))]
415    async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummarySchema> {
416        async move {
417            Ok(self
418                .state
419                .get_system_state()?
420                .into_iota_system_state_summary()
421                .into())
422        }
423        .trace()
424        .await
425    }
426
427    #[instrument(skip(self))]
428    async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1Schema> {
429        async move {
430            Ok(IotaSystemStateSummaryV1::try_from(
431                self.state
432                    .get_system_state()?
433                    .into_iota_system_state_summary(),
434            )?
435            .into())
436        }
437        .trace()
438        .await
439    }
440
441    #[instrument(skip(self))]
442    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
443        async move {
444            let epoch_store = self.state.load_epoch_store_one_call_per_task();
445            Ok(epoch_store.reference_gas_price().into())
446        }
447        .trace()
448        .await
449    }
450
451    #[instrument(skip(self))]
452    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
453        info!("get_validator_apy");
454        let system_state_summary = self.get_latest_iota_system_state().await?;
455
456        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
457            .await
458            .map_err(|e| error_object_from_rpc(e.into()))?;
459
460        let apys = calculate_apys(exchange_rate_table);
461
462        Ok(ValidatorApys {
463            apys,
464            epoch: system_state_summary.epoch,
465        })
466    }
467}
468
469pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
470    let mut apys = vec![];
471
472    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
473        let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
474
475        let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
476        apys.push(ValidatorApy {
477            address: rates.address,
478            apy: mean_apy,
479        });
480    }
481    apys
482}
483
484/// Calculate the APY using a 7-epoch moving average.
485///
486/// Returns the Mean by default, but falls back to the Median if outliers are
487/// detected. Outliers are defined as any APY > `MAX_VALID_APY` (100%) or if the
488/// trailing 8th epoch exchange rate is missing. This fallback protects against
489/// skewed results caused by large staking events or the spikes seen after
490/// missing exchange rates.
491pub fn mean_apy_from_exchange_rates<'er>(
492    exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
493) -> f64 {
494    // We set this value after observing the APY of validators in mainnet.
495    const MAX_VALID_APY: f64 = 1.00;
496    const SAMPLES: usize = 7;
497
498    let rates = exchange_rates.clone().dropping(1);
499    let rates_next = exchange_rates.dropping_back(1);
500
501    let mut apys = rates
502        .zip(rates_next)
503        .take(SAMPLES + 1)
504        .map(|(er, er_next)| calculate_apy(er, er_next))
505        .collect::<Vec<_>>();
506
507    // Return 0.0 if there is no data OR if any APY is negative
508    if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
509        return 0.0;
510    }
511    // If any single epoch has outliers (that is APY > MAX_VALID_APY or exchange
512    // rate for epoch e-8 is missing), we switch to Median. Otherwise, we use
513    // the standard Mean.
514    let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
515        || apys.iter().any(|&apy| apy > MAX_VALID_APY);
516
517    apys.truncate(SAMPLES);
518
519    if has_outlier {
520        Data::new(apys).median()
521    } else {
522        let sum: f64 = apys.iter().sum();
523        sum / SAMPLES as f64
524    }
525}
526
527/// Calculate the APY by the exchange rate of two consecutive epochs
528/// (`er`, `er_next`).
529///
530/// The formula used is `APY_e = (er.rate - er_next.rate) / er.rate * 365`
531fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
532    ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0
533}
534
535fn stake_status(
536    epoch: u64,
537    activation_epoch: u64,
538    principal: u64,
539    exists: bool,
540    current_rate: Option<&PoolTokenExchangeRate>,
541    rate_table: &ValidatorExchangeRates,
542) -> StakeStatus {
543    if !exists {
544        StakeStatus::Unstaked
545    } else if epoch >= activation_epoch {
546        // TODO: use dev_inspect to call a move function to get the estimated reward
547        let estimated_reward = if let Some(current_rate) = current_rate {
548            let stake_rate = rate_table
549                .rates
550                .iter()
551                .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
552                .unwrap_or_default();
553            let estimated_reward =
554                ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
555            max(0, estimated_reward.round() as u64)
556        } else {
557            0
558        };
559        StakeStatus::Active { estimated_reward }
560    } else {
561        StakeStatus::Pending
562    }
563}
564
565/// Cached exchange rates for validators for the given epoch, the cache size is
566/// 1, it will be cleared when the epoch changes. rates are in descending order
567/// by epoch.
568#[cached(
569    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
570    create = "{ SizedCache::with_size(1) }",
571    convert = "{ _current_epoch }",
572    result = true
573)]
574async fn exchange_rates(
575    state: &Arc<dyn StateRead>,
576    _current_epoch: EpochId,
577) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
578    Ok(active_validators_exchange_rates(state)?
579        .into_iter()
580        .chain(inactive_validators_exchange_rates(state)?.into_iter())
581        .collect())
582}
583
584// `cached` keeps results by the input key -- `current_epoch`.
585// `exchange_rates` is not a pure function, has effects via `state`
586// which `cached` is not aware of.
587// In normal node operation this does not create issues.
588// In tests that run several different networks the latter calls
589// will get incorrect/outdated cached results.
590// This function allows to clear `cached` cache for `exchange_rates`.
591#[cfg(msim)]
592pub async fn clear_exchange_rates_cache_for_testing() {
593    use cached::Cached;
594    if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
595        let mut cache = mutex.lock().await;
596        cache.cache_clear();
597    }
598}
599
600/// Get validator exchange rates
601fn validator_exchange_rates(
602    state: &Arc<dyn StateRead>,
603    tables: Vec<ValidatorTable>,
604) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
605    if tables.is_empty() {
606        return Ok(vec![]);
607    };
608
609    let mut exchange_rates = vec![];
610    // Get exchange rates for each validator
611    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
612        let mut rates = state
613            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
614            .into_iter()
615            .map(|(_object_id, df)| {
616                let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
617                    IotaError::ObjectDeserialization {
618                        error: e.to_string(),
619                    }
620                })?;
621
622                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
623                    &state.get_object_store().as_ref(),
624                    exchange_rates_id,
625                    &epoch,
626                )?;
627
628                Ok::<_, IotaError>((epoch, exchange_rate))
629            })
630            .collect::<Result<Vec<_>, _>>()?;
631
632        // Rates for some epochs might be missing due to safe mode, we need to backfill
633        // them.
634        rates = backfill_rates(rates);
635
636        exchange_rates.push(ValidatorExchangeRates {
637            address,
638            pool_id,
639            active,
640            rates,
641        });
642    }
643
644    Ok(exchange_rates)
645}
646
647/// Check for validators in the `Active` state and get its exchange rate
648fn active_validators_exchange_rates(
649    state: &Arc<dyn StateRead>,
650) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
651    let system_state_summary = IotaSystemStateSummaryV2::try_from(
652        state.get_system_state()?.into_iota_system_state_summary(),
653    )?;
654
655    let tables = system_state_summary
656        .active_validators
657        .into_iter()
658        .map(|validator| {
659            (
660                validator.iota_address,
661                validator.staking_pool_id,
662                validator.exchange_rates_id,
663                validator.exchange_rates_size,
664                true,
665            )
666        })
667        .collect();
668
669    validator_exchange_rates(state, tables)
670}
671
672/// Check for validators in the `Inactive` state and get its exchange rate
673fn inactive_validators_exchange_rates(
674    state: &Arc<dyn StateRead>,
675) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
676    let system_state_summary = IotaSystemStateSummaryV2::try_from(
677        state.get_system_state()?.into_iota_system_state_summary(),
678    )?;
679
680    let tables = validator_summary_from_system_state(
681        state,
682        system_state_summary.inactive_pools_id,
683        system_state_summary.inactive_pools_size,
684        |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
685        Some(system_state_summary.protocol_version),
686    )?;
687
688    validator_exchange_rates(state, tables)
689}
690
691/// Check for validators in the `Pending` state and get its exchange rate. For
692/// these validators, their exchange rates should not be cached as their state
693/// can occur during an epoch or across multiple ones. In contrast, exchange
694/// rates for `Active` and `Inactive` validators can be cached, as their state
695/// changes only at epoch change.
696fn pending_validators_exchange_rate(
697    state: &Arc<dyn StateRead>,
698) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
699    let system_state = state.get_system_state()?;
700    let object_store = state.get_object_store();
701
702    // Try to find for any pending active validator
703    let tables = system_state
704        .get_pending_active_validators(object_store)?
705        .into_iter()
706        .map(|pending_active_validator| {
707            (
708                pending_active_validator.iota_address,
709                pending_active_validator.staking_pool_id,
710                pending_active_validator.exchange_rates_id,
711                pending_active_validator.exchange_rates_size,
712                false,
713            )
714        })
715        .collect::<Vec<ValidatorTable>>();
716
717    validator_exchange_rates(state, tables)
718}
719
720/// Check for validators in the `Candidate` state and get its exchange rate. For
721/// these validators, their exchange rates should not be cached as their state
722/// can occur during an epoch or across multiple ones. In contrast, exchange
723/// rates for `Active` and `Inactive` validators can be cached, as their state
724/// changes only at epoch change.
725fn candidate_validators_exchange_rate(
726    state: &Arc<dyn StateRead>,
727) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
728    let system_state_summary = IotaSystemStateSummaryV2::try_from(
729        state.get_system_state()?.into_iota_system_state_summary(),
730    )?;
731
732    // From validator_candidates_id table get validator info using as key its
733    // IotaAddress
734    let tables = validator_summary_from_system_state(
735        state,
736        system_state_summary.validator_candidates_id,
737        system_state_summary.validator_candidates_size,
738        |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
739        Some(system_state_summary.protocol_version),
740    )?;
741
742    validator_exchange_rates(state, tables)
743}
744
745/// Fetches validator status information from `StateRead`.
746///
747/// This makes sense for validators not included in `IotaSystemStateSummary`.
748/// `IotaSystemStateSummary` only contains information about `Active`
749/// validators. To retrieve information about `Inactive`, `Candidate`, and
750/// `Pending` validators, we need to access dynamic fields within specific
751/// Move tables.
752///
753/// To retrieve validator status information, this function utilizes the
754/// corresponding `table_id` (an `ObjectID` value) and a `limit` to specify the
755/// number of records to fetch. Both the `table_id` and `limit` can be obtained
756/// from `IotaSystemStateSummary` in the caller. Additionally, keys are
757/// extracted from the table `DynamicFieldInfo` values according to the `key`
758/// closure. This helps in identifying the specific validator within the table.
759///
760/// # Example
761///
762/// ```text
763/// // Get inactive validators
764/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
765/// let _ = validator_summary_from_system_state(
766///     state,
767///     // ID of the object that maps from a staking pool ID to the inactive validator that has that pool as its staking pool.
768///     system_state_summary.inactive_pools_id,
769///     // Number of inactive staking pools.
770///     system_state_summary.inactive_pools_size,
771///     // Extract the `ID` of the `Inactive` validator from the `DynamicFieldInfo` in the `system_state_summary.inactive_pools_id` table
772///     |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
773/// ).unwrap();
774/// ```
775///
776/// # Example
777///
778/// ```text
779/// // Get candidate validators
780/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
781/// let _ = validator_summary_from_system_state(
782///     state,
783///     // ID of the object that stores preactive validators, mapping their addresses to their Validator structs
784///     system_state_summary.validator_candidates_id,
785///     // Number of preactive validators
786///     system_state_summary.validator_candidates_size,
787///     // Extract the `IotaAddress` of the `Candidate` validator from the `DynamicFieldInfo` in the `system_state_summary.validator_candidates_id` table
788///     |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
789/// ).unwrap();
790/// ```
791fn validator_summary_from_system_state<K, F>(
792    state: &Arc<dyn StateRead>,
793    table_id: ObjectID,
794    limit: u64,
795    key: F,
796    protocol_version: Option<u64>,
797) -> RpcInterimResult<Vec<ValidatorTable>>
798where
799    F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
800    K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
801{
802    let object_store = state.get_object_store();
803
804    state
805        .get_dynamic_fields(table_id, None, limit as usize)?
806        .into_iter()
807        .map(|(_object_id, df)| {
808            let validator_summary =
809                get_validator_from_table(object_store, table_id, &key(df)?, protocol_version)?;
810
811            Ok((
812                validator_summary.iota_address,
813                validator_summary.staking_pool_id,
814                validator_summary.exchange_rates_id,
815                validator_summary.exchange_rates_size,
816                false,
817            ))
818        })
819        .collect()
820}
821
822#[derive(Clone, Debug)]
823pub struct ValidatorExchangeRates {
824    pub address: IotaAddress,
825    pub pool_id: ObjectID,
826    pub active: bool,
827    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
828}
829
830/// Backfill missing rates for some epochs due to safe mode. If a rate is
831/// missing for epoch e, we will use the rate for epoch e-1 to fill it. Rates
832/// returned are in descending order by epoch.
833fn backfill_rates(
834    mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
835) -> Vec<(EpochId, PoolTokenExchangeRate)> {
836    if rates.is_empty() {
837        return rates;
838    }
839    // ensure epochs are processed in increasing order
840    rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
841
842    // Check if there are any gaps in the epochs
843    let (min_epoch, _) = rates.first().expect("rates should not be empty");
844    let (max_epoch, _) = rates.last().expect("rates should not be empty");
845    let expected_len = (max_epoch - min_epoch + 1) as usize;
846    let current_len = rates.len();
847
848    // Only perform backfilling if there are gaps
849    if current_len == expected_len {
850        rates.reverse();
851        return rates;
852    }
853
854    let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
855    let mut missing_rates = Vec::with_capacity(expected_len - current_len);
856    for (epoch_id, rate) in rates {
857        // fill gaps between the last processed epoch and the current one
858        if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
859            for missing_epoch_id in prev_epoch_id + 1..epoch_id {
860                missing_rates.push((missing_epoch_id, prev_rate.clone()));
861            }
862        };
863
864        // append any missing_rates before adding the current epoch.
865        // if empty, nothing gets appended.
866        // if not empty, it will be empty afterwards because it was moved into
867        // filled_rates
868        filled_rates.append(&mut missing_rates);
869        filled_rates.push((epoch_id, rate));
870    }
871    filled_rates.reverse();
872    filled_rates
873}
874
875impl IotaRpcModule for GovernanceReadApi {
876    fn rpc(self) -> RpcModule<Self> {
877        self.into_rpc()
878    }
879
880    fn rpc_doc_module() -> Module {
881        GovernanceReadApiOpenRpc::module_doc()
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use iota_types::iota_system_state::PoolTokenExchangeRate;
888
889    use super::*;
890
891    #[test]
892    fn calculate_apys_with_outliers() {
893        let file =
894            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
895                .unwrap();
896        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
897            serde_json::from_reader(file).unwrap();
898
899        let mut address_map = BTreeMap::new();
900
901        let exchange_rates = rates
902            .into_iter()
903            .map(|(validator, rates_vec)| {
904                let address = IotaAddress::random();
905                address_map.insert(address, validator);
906                ValidatorExchangeRates {
907                    address,
908                    pool_id: ObjectID::random(),
909                    active: true,
910                    rates: backfill_rates(rates_vec),
911                }
912            })
913            .collect();
914
915        let apys = calculate_apys(exchange_rates);
916
917        for apy in &apys {
918            println!("{}: {}", address_map[&apy.address], apy.apy);
919            assert!(apy.apy < 0.15)
920        }
921    }
922
923    #[test]
924    fn calculate_apys_without_outliers() {
925        let file =
926            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
927                .unwrap();
928        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
929            serde_json::from_reader(file).unwrap();
930
931        let mut address_map = BTreeMap::new();
932
933        let exchange_rates = rates
934            .into_iter()
935            .map(|(validator, rates_vec)| {
936                let address = IotaAddress::random();
937                address_map.insert(address, validator);
938                ValidatorExchangeRates {
939                    address,
940                    pool_id: ObjectID::random(),
941                    active: true,
942                    rates: backfill_rates(rates_vec),
943                }
944            })
945            .collect();
946
947        let apys = calculate_apys(exchange_rates);
948
949        for apy in &apys {
950            println!("{}: {}", address_map[&apy.address], apy.apy);
951            assert!(apy.apy < 0.15)
952        }
953    }
954
955    #[test]
956    fn test_backfill_rates_empty() {
957        let rates = vec![];
958        assert_eq!(backfill_rates(rates), vec![]);
959    }
960
961    #[test]
962    fn test_backfill_rates_no_gaps() {
963        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
964        let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
965        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
966        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
967
968        let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
969        assert_eq!(backfill_rates(rates), expected);
970    }
971
972    #[test]
973    fn test_backfill_single_rate() {
974        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
975        let rates = vec![(1, rate1.clone())];
976        let expected = vec![(1, rate1)];
977        assert_eq!(backfill_rates(rates), expected);
978    }
979
980    #[test]
981    fn test_backfill_rates_with_gaps() {
982        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
983        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
984        let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
985        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
986
987        let expected = vec![
988            (5, rate5),
989            (4, rate3.clone()),
990            (3, rate3),
991            (2, rate1.clone()),
992            (1, rate1),
993        ];
994        assert_eq!(backfill_rates(rates), expected);
995    }
996
997    #[test]
998    fn test_backfill_rates_missing_middle_epoch() {
999        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1000        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1001        let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
1002        let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
1003        assert_eq!(backfill_rates(rates), expected);
1004    }
1005
1006    #[test]
1007    fn test_backfill_rates_missing_middle_epochs() {
1008        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1009        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1010        let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1011        let expected = vec![
1012            (4, rate4),
1013            (3, rate1.clone()),
1014            (2, rate1.clone()),
1015            (1, rate1),
1016        ];
1017        assert_eq!(backfill_rates(rates), expected);
1018    }
1019
1020    #[test]
1021    fn test_backfill_rates_unordered_input() {
1022        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1023        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1024        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1025        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1026        let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1027        assert_eq!(backfill_rates(rates), expected);
1028    }
1029}