Skip to main content

iota_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{SizedCache, proc_macro::cached};
9use iota_core::authority::AuthorityState;
10use iota_json_rpc_api::{
11    GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics, error_object_from_rpc,
12};
13use iota_json_rpc_types::{
14    DelegatedStake, DelegatedTimelockedStake, IotaCommittee,
15    IotaSystemStateSummary as IotaSystemStateSummarySchema,
16    IotaSystemStateSummaryV1 as IotaSystemStateSummaryV1Schema, Stake, StakeStatus,
17    TimelockedStake, ValidatorApy, ValidatorApys,
18};
19use iota_metrics::spawn_monitored_task;
20use iota_open_rpc::Module;
21use iota_sdk_types::{Address, ObjectId};
22use iota_types::{
23    MoveTypeTagTrait,
24    committee::EpochId,
25    dynamic_field::{DynamicFieldInfo, get_dynamic_field_from_store},
26    error::{IotaError, UserInputError},
27    governance::StakedIota,
28    id::ID,
29    iota_serde::BigInt,
30    iota_system_state::{
31        IotaSystemState, IotaSystemStateTrait, PoolTokenExchangeRate, get_validator_from_table,
32        iota_system_state_summary::{IotaSystemStateSummaryV1, IotaSystemStateSummaryV2},
33    },
34    object::{Object, ObjectRead},
35    timelock::timelocked_staked_iota::TimelockedStakedIota,
36};
37use itertools::Itertools;
38use jsonrpsee::{RpcModule, core::RpcResult};
39use serde::{Serialize, de::DeserializeOwned};
40use statrs::statistics::{Data, Median};
41use tracing::{info, instrument};
42
43use crate::{
44    IotaRpcModule, ObjectProvider,
45    authority_state::StateRead,
46    error::{Error, IotaRpcInputError, RpcInterimResult},
47    logger::FutureWithTracing as _,
48};
49
50type ValidatorTable = (Address, ObjectId, ObjectId, u64, bool);
51
52#[derive(Clone)]
53pub struct GovernanceReadApi {
54    state: Arc<dyn StateRead>,
55    pub metrics: Arc<JsonRpcMetrics>,
56}
57
58impl GovernanceReadApi {
59    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
60        Self { state, metrics }
61    }
62
63    async fn get_staked_iota(&self, owner: Address) -> Result<Vec<StakedIota>, Error> {
64        let state = self.state.clone();
65        let result =
66            spawn_monitored_task!(async move { state.get_staked_iota(owner).await }).await??;
67
68        self.metrics
69            .get_stake_iota_result_size
70            .observe(result.len() as f64);
71        self.metrics
72            .get_stake_iota_result_size_total
73            .inc_by(result.len() as u64);
74        Ok(result)
75    }
76
77    async fn get_timelocked_staked_iota(
78        &self,
79        owner: Address,
80    ) -> Result<Vec<TimelockedStakedIota>, Error> {
81        let state = self.state.clone();
82        let result =
83            spawn_monitored_task!(async move { state.get_timelocked_staked_iota(owner).await })
84                .await??;
85
86        self.metrics
87            .get_stake_iota_result_size
88            .observe(result.len() as f64);
89        self.metrics
90            .get_stake_iota_result_size_total
91            .inc_by(result.len() as u64);
92        Ok(result)
93    }
94
95    async fn get_stakes_by_ids(
96        &self,
97        staked_iota_ids: Vec<ObjectId>,
98    ) -> Result<Vec<DelegatedStake>, Error> {
99        let state = self.state.clone();
100        let stakes_read = spawn_monitored_task!(async move {
101            staked_iota_ids
102                .iter()
103                .map(|id| state.get_object_read(id))
104                .collect::<Result<Vec<_>, _>>()
105        })
106        .await??;
107
108        if stakes_read.is_empty() {
109            return Ok(vec![]);
110        }
111
112        let stakes: Vec<(StakedIota, bool)> = self
113            .stakes_with_status(stakes_read.into_iter())
114            .await?
115            .into_iter()
116            .map(|(o, b)| StakedIota::try_from(&o).map(|stake| (stake, b)))
117            .collect::<Result<_, _>>()?;
118
119        self.get_delegated_stakes(stakes).await
120    }
121
122    async fn get_stakes(&self, owner: Address) -> Result<Vec<DelegatedStake>, Error> {
123        let timer = self.metrics.get_stake_iota_latency.start_timer();
124        let stakes = self.get_staked_iota(owner).await?;
125        if stakes.is_empty() {
126            return Ok(vec![]);
127        }
128        drop(timer);
129
130        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
131
132        let self_clone = self.clone();
133        spawn_monitored_task!(
134            self_clone.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
135        )
136        .await?
137    }
138
139    async fn get_timelocked_stakes_by_ids(
140        &self,
141        timelocked_staked_iota_ids: Vec<ObjectId>,
142    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
143        let state = self.state.clone();
144        let stakes_read = spawn_monitored_task!(async move {
145            timelocked_staked_iota_ids
146                .iter()
147                .map(|id| state.get_object_read(id))
148                .collect::<Result<Vec<_>, _>>()
149        })
150        .await??;
151
152        if stakes_read.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let stakes: Vec<(TimelockedStakedIota, bool)> = self
157            .stakes_with_status(stakes_read.into_iter())
158            .await?
159            .into_iter()
160            .map(|(o, b)| TimelockedStakedIota::try_from(&o).map(|stake| (stake, b)))
161            .collect::<Result<_, _>>()?;
162
163        self.get_delegated_timelocked_stakes(stakes).await
164    }
165
166    async fn get_timelocked_stakes(
167        &self,
168        owner: Address,
169    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
170        let timer = self.metrics.get_stake_iota_latency.start_timer();
171        let stakes = self.get_timelocked_staked_iota(owner).await?;
172        if stakes.is_empty() {
173            return Ok(vec![]);
174        }
175        drop(timer);
176
177        let _timer = self.metrics.get_delegated_iota_latency.start_timer();
178
179        let self_clone = self.clone();
180        spawn_monitored_task!(
181            self_clone
182                .get_delegated_timelocked_stakes(stakes.into_iter().map(|s| (s, true)).collect())
183        )
184        .await?
185    }
186
187    async fn get_delegated_stakes(
188        &self,
189        stakes: Vec<(StakedIota, bool)>,
190    ) -> Result<Vec<DelegatedStake>, Error> {
191        let pools = stakes.into_iter().fold(
192            BTreeMap::<_, Vec<_>>::new(),
193            |mut pools, (stake, exists)| {
194                pools
195                    .entry(stake.pool_id())
196                    .or_default()
197                    .push((stake, exists));
198                pools
199            },
200        );
201
202        let system_state = self.get_system_state()?;
203        let system_state_summary = IotaSystemStateSummaryV2::try_from(
204            system_state.clone().into_iota_system_state_summary(),
205        )?;
206
207        let rates = exchange_rates(&self.state, system_state_summary.epoch)
208            .await?
209            .into_iter()
210            // Try to find for any candidate validator exchange rate
211            .chain(candidate_validators_exchange_rate(&self.state)?)
212            // Try to find for any pending validator exchange rate
213            .chain(pending_validators_exchange_rate(&self.state)?)
214            .map(|rates| (rates.pool_id, rates))
215            .collect::<BTreeMap<_, _>>();
216
217        let mut delegated_stakes = vec![];
218        for (pool_id, stakes) in pools {
219            // Rate table and rate can be null when the pool is not active
220            let rate_table = rates.get(&pool_id).ok_or_else(|| {
221                IotaRpcInputError::GenericNotFound(format!(
222                    "Cannot find rates for staking pool {pool_id}"
223                ))
224            })?;
225            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
226
227            let mut delegations = vec![];
228            for (stake, exists) in stakes {
229                let status = stake_status(
230                    system_state_summary.epoch,
231                    stake.activation_epoch(),
232                    stake.principal(),
233                    exists,
234                    current_rate,
235                    rate_table,
236                );
237                delegations.push(Stake {
238                    staked_iota_id: stake.id(),
239                    // TODO: this might change when we implement warm up period.
240                    stake_request_epoch: stake.activation_epoch() - 1,
241                    stake_active_epoch: stake.activation_epoch(),
242                    principal: stake.principal(),
243                    status,
244                })
245            }
246            delegated_stakes.push(DelegatedStake {
247                validator_address: rate_table.address,
248                staking_pool: pool_id,
249                stakes: delegations,
250            })
251        }
252        Ok(delegated_stakes)
253    }
254
255    async fn get_delegated_timelocked_stakes(
256        &self,
257        stakes: Vec<(TimelockedStakedIota, bool)>,
258    ) -> Result<Vec<DelegatedTimelockedStake>, Error> {
259        let pools = stakes.into_iter().fold(
260            BTreeMap::<_, Vec<_>>::new(),
261            |mut pools, (stake, exists)| {
262                pools
263                    .entry(stake.pool_id())
264                    .or_default()
265                    .push((stake, exists));
266                pools
267            },
268        );
269
270        let system_state = self.get_system_state()?;
271        let system_state_summary = IotaSystemStateSummaryV2::try_from(
272            system_state.clone().into_iota_system_state_summary(),
273        )?;
274
275        let rates = exchange_rates(&self.state, system_state_summary.epoch)
276            .await?
277            .into_iter()
278            // Try to find for any candidate validator exchange rate
279            .chain(candidate_validators_exchange_rate(&self.state)?)
280            // Try to find for any pending validator exchange rate
281            .chain(pending_validators_exchange_rate(&self.state)?)
282            .map(|rates| (rates.pool_id, rates))
283            .collect::<BTreeMap<_, _>>();
284
285        let mut delegated_stakes = vec![];
286        for (pool_id, stakes) in pools {
287            // Rate table and rate can be null when the pool is not active
288            let rate_table = rates.get(&pool_id).ok_or_else(|| {
289                IotaRpcInputError::GenericNotFound(format!(
290                    "Cannot find rates for staking pool {pool_id}"
291                ))
292            })?;
293            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
294
295            let mut delegations = vec![];
296            for (stake, exists) in stakes {
297                let status = stake_status(
298                    system_state_summary.epoch,
299                    stake.activation_epoch(),
300                    stake.principal(),
301                    exists,
302                    current_rate,
303                    rate_table,
304                );
305                delegations.push(TimelockedStake {
306                    timelocked_staked_iota_id: stake.id(),
307                    // TODO: this might change when we implement warm up period.
308                    stake_request_epoch: stake.activation_epoch() - 1,
309                    stake_active_epoch: stake.activation_epoch(),
310                    principal: stake.principal(),
311                    status,
312                    expiration_timestamp_ms: stake.expiration_timestamp_ms(),
313                    label: stake.label().clone(),
314                })
315            }
316            delegated_stakes.push(DelegatedTimelockedStake {
317                validator_address: rate_table.address,
318                staking_pool: pool_id,
319                stakes: delegations,
320            })
321        }
322        Ok(delegated_stakes)
323    }
324
325    async fn stakes_with_status(
326        &self,
327        iter: impl Iterator<Item = ObjectRead>,
328    ) -> Result<Vec<(Object, bool)>, Error> {
329        let mut stakes = vec![];
330
331        for stake in iter {
332            match stake {
333                ObjectRead::Exists(_, o, _) => stakes.push((o, true)),
334                ObjectRead::Deleted(object_ref) => {
335                    let Some(o) = self
336                        .state
337                        .find_object_lt_or_eq_version(
338                            &object_ref.object_id,
339                            &object_ref.version.previous().unwrap(),
340                        )
341                        .await?
342                    else {
343                        Err(IotaRpcInputError::UserInput(
344                            UserInputError::ObjectNotFound {
345                                object_id: object_ref.object_id,
346                                version: None,
347                            },
348                        ))?
349                    };
350                    stakes.push((o, false));
351                }
352                ObjectRead::NotExists(id) => Err(IotaRpcInputError::UserInput(
353                    UserInputError::ObjectNotFound {
354                        object_id: id,
355                        version: None,
356                    },
357                ))?,
358            }
359        }
360
361        Ok(stakes)
362    }
363
364    fn get_system_state(&self) -> Result<IotaSystemState, Error> {
365        Ok(self.state.get_system_state()?)
366    }
367}
368
369#[async_trait]
370impl GovernanceReadApiServer for GovernanceReadApi {
371    #[instrument(skip(self, staked_iota_ids), fields(staked_iota_ids = staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
372    async fn get_stakes_by_ids(
373        &self,
374        staked_iota_ids: Vec<ObjectId>,
375    ) -> RpcResult<Vec<DelegatedStake>> {
376        self.get_stakes_by_ids(staked_iota_ids).trace().await
377    }
378
379    #[instrument(skip(self, owner), fields(owner = %owner))]
380    async fn get_stakes(&self, owner: Address) -> RpcResult<Vec<DelegatedStake>> {
381        self.get_stakes(owner).trace().await
382    }
383
384    #[instrument(skip(self, timelocked_staked_iota_ids), fields(timelocked_staked_iota_ids = timelocked_staked_iota_ids.iter().map(|id| id.to_string()).collect::<Vec<String>>().join(", ")))]
385    async fn get_timelocked_stakes_by_ids(
386        &self,
387        timelocked_staked_iota_ids: Vec<ObjectId>,
388    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
389        self.get_timelocked_stakes_by_ids(timelocked_staked_iota_ids)
390            .trace()
391            .await
392    }
393
394    #[instrument(skip(self, owner), fields(owner = %owner))]
395    async fn get_timelocked_stakes(
396        &self,
397        owner: Address,
398    ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
399        self.get_timelocked_stakes(owner).trace().await
400    }
401
402    #[instrument(skip(self))]
403    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
404        async move {
405            self.state
406                .get_or_latest_committee(epoch)
407                .map(|committee| committee.into())
408                .map_err(Error::from)
409        }
410        .trace()
411        .await
412    }
413
414    #[instrument(skip(self))]
415    async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummarySchema> {
416        async move {
417            Ok(self
418                .state
419                .get_system_state()?
420                .into_iota_system_state_summary()
421                .into())
422        }
423        .trace()
424        .await
425    }
426
427    #[instrument(skip(self))]
428    async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1Schema> {
429        async move {
430            Ok(IotaSystemStateSummaryV1::try_from(
431                self.state
432                    .get_system_state()?
433                    .into_iota_system_state_summary(),
434            )?
435            .into())
436        }
437        .trace()
438        .await
439    }
440
441    #[instrument(skip(self))]
442    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
443        async move {
444            let epoch_store = self.state.load_epoch_store_one_call_per_task();
445            Ok(epoch_store.reference_gas_price().into())
446        }
447        .trace()
448        .await
449    }
450
451    #[instrument(skip(self))]
452    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
453        info!("get_validator_apy");
454        let system_state_summary = self.get_latest_iota_system_state().await?;
455
456        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch)
457            .await
458            .map_err(|e| error_object_from_rpc(e.into()))?;
459
460        let apys = calculate_apys(exchange_rate_table);
461
462        Ok(ValidatorApys {
463            apys,
464            epoch: system_state_summary.epoch,
465        })
466    }
467}
468
469pub fn calculate_apys(exchange_rate_table: Vec<ValidatorExchangeRates>) -> Vec<ValidatorApy> {
470    let mut apys = vec![];
471
472    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
473        let exchange_rates = rates.rates.iter().map(|(_, rate)| rate);
474
475        let mean_apy = mean_apy_from_exchange_rates(exchange_rates);
476        apys.push(ValidatorApy {
477            address: rates.address,
478            apy: mean_apy,
479        });
480    }
481    apys
482}
483
484/// Calculate the APY using a 7-epoch moving average.
485///
486/// Returns the Mean by default, but falls back to the Median if outliers are
487/// detected. Outliers are defined as any APY > `MAX_VALID_APY` (100%) or if the
488/// trailing 8th epoch exchange rate is missing. This fallback protects against
489/// skewed results caused by large staking events or the spikes seen after
490/// missing exchange rates.
491pub fn mean_apy_from_exchange_rates<'er>(
492    exchange_rates: impl DoubleEndedIterator<Item = &'er PoolTokenExchangeRate> + Clone,
493) -> f64 {
494    // We set this value after observing the APY of validators in mainnet.
495    const MAX_VALID_APY: f64 = 1.00;
496    const SAMPLES: usize = 7;
497
498    let rates = exchange_rates.clone().dropping(1);
499    let rates_next = exchange_rates.dropping_back(1);
500
501    let mut apys = rates
502        .zip(rates_next)
503        .take(SAMPLES + 1)
504        .map(|(er, er_next)| calculate_apy(er, er_next))
505        .collect::<Vec<_>>();
506
507    // Return 0.0 if there is no data OR if any APY is negative
508    if apys.is_empty() || apys.iter().any(|&apy| apy < 0.0) {
509        return 0.0;
510    }
511    // If any single epoch has outliers (that is APY > MAX_VALID_APY or exchange
512    // rate for epoch e-8 is missing), we switch to Median. Otherwise, we use
513    // the standard Mean.
514    let has_outlier = apys.get(SAMPLES).is_some_and(|&apy| apy <= 0.0)
515        || apys.iter().any(|&apy| apy > MAX_VALID_APY);
516
517    apys.truncate(SAMPLES);
518
519    if has_outlier {
520        Data::new(apys).median()
521    } else {
522        let sum: f64 = apys.iter().sum();
523        sum / SAMPLES as f64
524    }
525}
526
527/// APY magnitudes below this threshold are treated as exactly zero.
528const APY_DUST_THRESHOLD: f64 = 1e-9;
529
530/// Calculate the APY from the exchange rate of two consecutive epochs
531/// (`er` is the older epoch, `er_next` the newer one).
532///
533/// The formula used is `APY_e = (er.rate - er_next.rate) / er_next.rate * 365`.
534fn calculate_apy(er: &PoolTokenExchangeRate, er_next: &PoolTokenExchangeRate) -> f64 {
535    let apy = ((er.rate() - er_next.rate()) / er_next.rate()) * 365.0;
536    if apy.abs() < APY_DUST_THRESHOLD {
537        0.0
538    } else {
539        apy
540    }
541}
542
543fn stake_status(
544    epoch: u64,
545    activation_epoch: u64,
546    principal: u64,
547    exists: bool,
548    current_rate: Option<&PoolTokenExchangeRate>,
549    rate_table: &ValidatorExchangeRates,
550) -> StakeStatus {
551    if !exists {
552        StakeStatus::Unstaked
553    } else if epoch >= activation_epoch {
554        // TODO: use dev_inspect to call a move function to get the estimated reward
555        let estimated_reward = if let Some(current_rate) = current_rate {
556            let stake_rate = rate_table
557                .rates
558                .iter()
559                .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
560                .unwrap_or_default();
561            let estimated_reward =
562                ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
563            max(0, estimated_reward.round() as u64)
564        } else {
565            0
566        };
567        StakeStatus::Active { estimated_reward }
568    } else {
569        StakeStatus::Pending
570    }
571}
572
573/// Cached exchange rates for validators for the given epoch, the cache size is
574/// 1, it will be cleared when the epoch changes. rates are in descending order
575/// by epoch.
576#[cached(
577    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
578    create = "{ SizedCache::with_size(1) }",
579    convert = "{ _current_epoch }",
580    result = true
581)]
582async fn exchange_rates(
583    state: &Arc<dyn StateRead>,
584    _current_epoch: EpochId,
585) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
586    Ok(active_validators_exchange_rates(state)?
587        .into_iter()
588        .chain(inactive_validators_exchange_rates(state)?.into_iter())
589        .collect())
590}
591
592// `cached` keeps results by the input key -- `current_epoch`.
593// `exchange_rates` is not a pure function, has effects via `state`
594// which `cached` is not aware of.
595// In normal node operation this does not create issues.
596// In tests that run several different networks the latter calls
597// will get incorrect/outdated cached results.
598// This function allows to clear `cached` cache for `exchange_rates`.
599#[cfg(msim)]
600pub async fn clear_exchange_rates_cache_for_testing() {
601    use cached::Cached;
602    if let Some(mutex) = ::cached::once_cell::sync::Lazy::get(&EXCHANGE_RATES) {
603        let mut cache = mutex.lock().await;
604        cache.cache_clear();
605    }
606}
607
608/// Get validator exchange rates
609fn validator_exchange_rates(
610    state: &Arc<dyn StateRead>,
611    tables: Vec<ValidatorTable>,
612) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
613    if tables.is_empty() {
614        return Ok(vec![]);
615    };
616
617    let mut exchange_rates = vec![];
618    // Get exchange rates for each validator
619    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
620        let mut rates = state
621            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
622            .into_iter()
623            .map(|(_object_id, df)| {
624                let epoch: EpochId = bcs::from_bytes(&df.bcs_name).map_err(|e| {
625                    IotaError::ObjectDeserialization {
626                        error: e.to_string(),
627                    }
628                })?;
629
630                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
631                    &state.get_object_store().as_ref(),
632                    exchange_rates_id,
633                    &epoch,
634                )?;
635
636                Ok::<_, IotaError>((epoch, exchange_rate))
637            })
638            .collect::<Result<Vec<_>, _>>()?;
639
640        // Rates for some epochs might be missing due to safe mode, we need to backfill
641        // them.
642        rates = backfill_rates(rates);
643
644        exchange_rates.push(ValidatorExchangeRates {
645            address,
646            pool_id,
647            active,
648            rates,
649        });
650    }
651
652    Ok(exchange_rates)
653}
654
655/// Check for validators in the `Active` state and get its exchange rate
656fn active_validators_exchange_rates(
657    state: &Arc<dyn StateRead>,
658) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
659    let system_state_summary = IotaSystemStateSummaryV2::try_from(
660        state.get_system_state()?.into_iota_system_state_summary(),
661    )?;
662
663    let tables = system_state_summary
664        .active_validators
665        .into_iter()
666        .map(|validator| {
667            (
668                validator.iota_address,
669                validator.staking_pool_id,
670                validator.exchange_rates_id,
671                validator.exchange_rates_size,
672                true,
673            )
674        })
675        .collect();
676
677    validator_exchange_rates(state, tables)
678}
679
680/// Check for validators in the `Inactive` state and get its exchange rate
681fn inactive_validators_exchange_rates(
682    state: &Arc<dyn StateRead>,
683) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
684    let system_state_summary = IotaSystemStateSummaryV2::try_from(
685        state.get_system_state()?.into_iota_system_state_summary(),
686    )?;
687
688    let tables = validator_summary_from_system_state(
689        state,
690        system_state_summary.inactive_pools_id,
691        system_state_summary.inactive_pools_size,
692        |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
693        Some(system_state_summary.protocol_version),
694    )?;
695
696    validator_exchange_rates(state, tables)
697}
698
699/// Check for validators in the `Pending` state and get its exchange rate. For
700/// these validators, their exchange rates should not be cached as their state
701/// can occur during an epoch or across multiple ones. In contrast, exchange
702/// rates for `Active` and `Inactive` validators can be cached, as their state
703/// changes only at epoch change.
704fn pending_validators_exchange_rate(
705    state: &Arc<dyn StateRead>,
706) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
707    let system_state = state.get_system_state()?;
708    let object_store = state.get_object_store();
709
710    // Try to find for any pending active validator
711    let tables = system_state
712        .get_pending_active_validators(object_store)?
713        .into_iter()
714        .map(|pending_active_validator| {
715            (
716                pending_active_validator.iota_address,
717                pending_active_validator.staking_pool_id,
718                pending_active_validator.exchange_rates_id,
719                pending_active_validator.exchange_rates_size,
720                false,
721            )
722        })
723        .collect::<Vec<ValidatorTable>>();
724
725    validator_exchange_rates(state, tables)
726}
727
728/// Check for validators in the `Candidate` state and get its exchange rate. For
729/// these validators, their exchange rates should not be cached as their state
730/// can occur during an epoch or across multiple ones. In contrast, exchange
731/// rates for `Active` and `Inactive` validators can be cached, as their state
732/// changes only at epoch change.
733fn candidate_validators_exchange_rate(
734    state: &Arc<dyn StateRead>,
735) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
736    let system_state_summary = IotaSystemStateSummaryV2::try_from(
737        state.get_system_state()?.into_iota_system_state_summary(),
738    )?;
739
740    // From validator_candidates_id table get validator info using as key its
741    // Address
742    let tables = validator_summary_from_system_state(
743        state,
744        system_state_summary.validator_candidates_id,
745        system_state_summary.validator_candidates_size,
746        |df| bcs::from_bytes::<Address>(&df.bcs_name).map_err(Into::into),
747        Some(system_state_summary.protocol_version),
748    )?;
749
750    validator_exchange_rates(state, tables)
751}
752
753/// Fetches validator status information from `StateRead`.
754///
755/// This makes sense for validators not included in `IotaSystemStateSummary`.
756/// `IotaSystemStateSummary` only contains information about `Active`
757/// validators. To retrieve information about `Inactive`, `Candidate`, and
758/// `Pending` validators, we need to access dynamic fields within specific
759/// Move tables.
760///
761/// To retrieve validator status information, this function utilizes the
762/// corresponding `table_id` (an `ObjectId` value) and a `limit` to specify the
763/// number of records to fetch. Both the `table_id` and `limit` can be obtained
764/// from `IotaSystemStateSummary` in the caller. Additionally, keys are
765/// extracted from the table `DynamicFieldInfo` values according to the `key`
766/// closure. This helps in identifying the specific validator within the table.
767///
768/// # Example
769///
770/// ```text
771/// // Get inactive validators
772/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
773/// let _ = validator_summary_from_system_state(
774///     state,
775///     // ID of the object that maps from a staking pool ID to the inactive validator that has that pool as its staking pool.
776///     system_state_summary.inactive_pools_id,
777///     // Number of inactive staking pools.
778///     system_state_summary.inactive_pools_size,
779///     // Extract the `ID` of the `Inactive` validator from the `DynamicFieldInfo` in the `system_state_summary.inactive_pools_id` table
780///     |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
781/// ).unwrap();
782/// ```
783///
784/// # Example
785///
786/// ```text
787/// // Get candidate validators
788/// let system_state_summary = state.get_system_state()?.into_iota_system_state_summary();
789/// let _ = validator_summary_from_system_state(
790///     state,
791///     // ID of the object that stores preactive validators, mapping their addresses to their Validator structs
792///     system_state_summary.validator_candidates_id,
793///     // Number of preactive validators
794///     system_state_summary.validator_candidates_size,
795///     // Extract the `Address` of the `Candidate` validator from the `DynamicFieldInfo` in the `system_state_summary.validator_candidates_id` table
796///     |df| bcs::from_bytes::<Address>(&df.bcs_name).map_err(Into::into),
797/// ).unwrap();
798/// ```
799fn validator_summary_from_system_state<K, F>(
800    state: &Arc<dyn StateRead>,
801    table_id: ObjectId,
802    limit: u64,
803    key: F,
804    protocol_version: Option<u64>,
805) -> RpcInterimResult<Vec<ValidatorTable>>
806where
807    F: Fn(DynamicFieldInfo) -> RpcInterimResult<K>,
808    K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug,
809{
810    let object_store = state.get_object_store();
811
812    state
813        .get_dynamic_fields(table_id, None, limit as usize)?
814        .into_iter()
815        .map(|(_object_id, df)| {
816            let validator_summary =
817                get_validator_from_table(object_store, table_id, &key(df)?, protocol_version)?;
818
819            Ok((
820                validator_summary.iota_address,
821                validator_summary.staking_pool_id,
822                validator_summary.exchange_rates_id,
823                validator_summary.exchange_rates_size,
824                false,
825            ))
826        })
827        .collect()
828}
829
830#[derive(Clone, Debug)]
831pub struct ValidatorExchangeRates {
832    pub address: Address,
833    pub pool_id: ObjectId,
834    pub active: bool,
835    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
836}
837
838/// Backfill missing rates for some epochs due to safe mode. If a rate is
839/// missing for epoch e, we will use the rate for epoch e-1 to fill it. Rates
840/// returned are in descending order by epoch.
841fn backfill_rates(
842    mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
843) -> Vec<(EpochId, PoolTokenExchangeRate)> {
844    if rates.is_empty() {
845        return rates;
846    }
847    // ensure epochs are processed in increasing order
848    rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
849
850    // Check if there are any gaps in the epochs
851    let (min_epoch, _) = rates.first().expect("rates should not be empty");
852    let (max_epoch, _) = rates.last().expect("rates should not be empty");
853    let expected_len = (max_epoch - min_epoch + 1) as usize;
854    let current_len = rates.len();
855
856    // Only perform backfilling if there are gaps
857    if current_len == expected_len {
858        rates.reverse();
859        return rates;
860    }
861
862    let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
863    let mut missing_rates = Vec::with_capacity(expected_len - current_len);
864    for (epoch_id, rate) in rates {
865        // fill gaps between the last processed epoch and the current one
866        if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
867            for missing_epoch_id in prev_epoch_id + 1..epoch_id {
868                missing_rates.push((missing_epoch_id, prev_rate.clone()));
869            }
870        };
871
872        // append any missing_rates before adding the current epoch.
873        // if empty, nothing gets appended.
874        // if not empty, it will be empty afterwards because it was moved into
875        // filled_rates
876        filled_rates.append(&mut missing_rates);
877        filled_rates.push((epoch_id, rate));
878    }
879    filled_rates.reverse();
880    filled_rates
881}
882
883impl IotaRpcModule for GovernanceReadApi {
884    fn rpc(self) -> RpcModule<Self> {
885        self.into_rpc()
886    }
887
888    fn rpc_doc_module() -> Module {
889        GovernanceReadApiOpenRpc::module_doc()
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use iota_types::iota_system_state::PoolTokenExchangeRate;
896
897    use super::*;
898
899    #[test]
900    fn calculate_apys_with_outliers() {
901        let file =
902            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-test.json")
903                .unwrap();
904        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
905            serde_json::from_reader(file).unwrap();
906
907        let mut address_map = BTreeMap::new();
908
909        let exchange_rates = rates
910            .into_iter()
911            .map(|(validator, rates_vec)| {
912                let address = Address::random();
913                address_map.insert(address, validator);
914                ValidatorExchangeRates {
915                    address,
916                    pool_id: ObjectId::random(),
917                    active: true,
918                    rates: backfill_rates(rates_vec),
919                }
920            })
921            .collect();
922
923        let apys = calculate_apys(exchange_rates);
924
925        for apy in &apys {
926            println!("{}: {}", address_map[&apy.address], apy.apy);
927            assert!(apy.apy < 0.15)
928        }
929    }
930
931    #[test]
932    fn calculate_apys_without_outliers() {
933        let file =
934            std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates-feb26.json")
935                .unwrap();
936        let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
937            serde_json::from_reader(file).unwrap();
938
939        let mut address_map = BTreeMap::new();
940
941        let exchange_rates = rates
942            .into_iter()
943            .map(|(validator, rates_vec)| {
944                let address = Address::random();
945                address_map.insert(address, validator);
946                ValidatorExchangeRates {
947                    address,
948                    pool_id: ObjectId::random(),
949                    active: true,
950                    rates: backfill_rates(rates_vec),
951                }
952            })
953            .collect();
954
955        let apys = calculate_apys(exchange_rates);
956
957        for apy in &apys {
958            println!("{}: {}", address_map[&apy.address], apy.apy);
959            assert!(apy.apy < 0.15)
960        }
961    }
962
963    #[test]
964    fn calculate_apy_is_not_negative_for_zero_reward_epoch() {
965        // Real mainnet exchange rates for two validators transitioning from
966        // epoch 381 to 382, an epoch in which they earned no rewards. The rate is
967        // therefore unchanged up to integer-truncation dust, so `calculate_apy`
968        // must report an effectively-zero APY (within
969        // `[0, APY_DUST_THRESHOLD)`).
970        let cases = [
971            (
972                (48_913_429_030_426_080u64, 43_331_127_650_932_384u64),
973                (48_641_042_011_532_656u64, 43_089_827_114_043_304u64),
974            ),
975            (
976                (33_370_417_056_337_732u64, 29_578_114_234_284_444u64),
977                (33_370_374_157_145_896u64, 29_578_076_210_270_704u64),
978            ),
979        ];
980
981        for ((i_old, t_old), (i_new, t_new)) in cases {
982            let er = PoolTokenExchangeRate::new_for_testing(i_old, t_old);
983            let er_next = PoolTokenExchangeRate::new_for_testing(i_new, t_new);
984            let apy = calculate_apy(&er, &er_next);
985            assert!(
986                (0.0..APY_DUST_THRESHOLD).contains(&apy),
987                "expected an effectively-zero, non-negative APY, got {apy}"
988            );
989        }
990    }
991
992    #[test]
993    fn test_backfill_rates_empty() {
994        let rates = vec![];
995        assert_eq!(backfill_rates(rates), vec![]);
996    }
997
998    #[test]
999    fn test_backfill_rates_no_gaps() {
1000        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1001        let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
1002        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1003        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
1004
1005        let expected: Vec<(u64, PoolTokenExchangeRate)> = vec![(3, rate3), (2, rate2), (1, rate1)];
1006        assert_eq!(backfill_rates(rates), expected);
1007    }
1008
1009    #[test]
1010    fn test_backfill_single_rate() {
1011        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1012        let rates = vec![(1, rate1.clone())];
1013        let expected = vec![(1, rate1)];
1014        assert_eq!(backfill_rates(rates), expected);
1015    }
1016
1017    #[test]
1018    fn test_backfill_rates_with_gaps() {
1019        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1020        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1021        let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
1022        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
1023
1024        let expected = vec![
1025            (5, rate5),
1026            (4, rate3.clone()),
1027            (3, rate3),
1028            (2, rate1.clone()),
1029            (1, rate1),
1030        ];
1031        assert_eq!(backfill_rates(rates), expected);
1032    }
1033
1034    #[test]
1035    fn test_backfill_rates_missing_middle_epoch() {
1036        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1037        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1038        let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
1039        let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
1040        assert_eq!(backfill_rates(rates), expected);
1041    }
1042
1043    #[test]
1044    fn test_backfill_rates_missing_middle_epochs() {
1045        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1046        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1047        let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
1048        let expected = vec![
1049            (4, rate4),
1050            (3, rate1.clone()),
1051            (2, rate1.clone()),
1052            (1, rate1),
1053        ];
1054        assert_eq!(backfill_rates(rates), expected);
1055    }
1056
1057    #[test]
1058    fn test_backfill_rates_unordered_input() {
1059        let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
1060        let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
1061        let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
1062        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
1063        let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
1064        assert_eq!(backfill_rates(rates), expected);
1065    }
1066}