1use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use async_trait::async_trait;
8use cached::{Cached, SizedCache};
9use iota_json_rpc::{IotaRpcModule, governance_api::ValidatorExchangeRates};
10use iota_json_rpc_api::GovernanceReadApiServer;
11use iota_json_rpc_types::{
12 DelegatedStake, DelegatedTimelockedStake, EpochInfo, IotaCommittee, IotaObjectDataFilter,
13 StakeStatus, ValidatorApys,
14};
15use iota_open_rpc::Module;
16use iota_types::{
17 MoveTypeTagTrait,
18 base_types::{IotaAddress, MoveObjectType, ObjectID},
19 committee::EpochId,
20 dynamic_field::DynamicFieldInfo,
21 governance::StakedIota,
22 id::ID,
23 iota_serde::BigInt,
24 iota_system_state::{
25 PoolTokenExchangeRate,
26 iota_system_state_summary::{IotaSystemStateSummary, IotaSystemStateSummaryV1},
27 },
28 timelock::timelocked_staked_iota::TimelockedStakedIota,
29};
30use jsonrpsee::{RpcModule, core::RpcResult};
31use serde::{Serialize, de::DeserializeOwned};
32use tokio::sync::Mutex;
33
34use crate::{
35 errors::IndexerError, indexer_reader::IndexerReader, types::IotaSystemStateSummaryView,
36};
37
38const MAX_QUERY_STAKED_OBJECTS: usize = 1000;
40
41type ValidatorTable = (IotaAddress, ObjectID, ObjectID, u64, bool);
42
43#[derive(Clone)]
44pub struct GovernanceReadApi {
45 inner: IndexerReader,
46 exchange_rates_cache: Arc<Mutex<SizedCache<EpochId, Vec<ValidatorExchangeRates>>>>,
47 validators_apys_cache: Arc<Mutex<SizedCache<EpochId, BTreeMap<IotaAddress, f64>>>>,
48}
49
50impl GovernanceReadApi {
51 pub fn new(inner: IndexerReader) -> Self {
52 Self {
53 inner,
54 exchange_rates_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
55 validators_apys_cache: Arc::new(Mutex::new(SizedCache::with_size(1))),
56 }
57 }
58
59 pub async fn get_validator_apy(
61 &self,
62 address: &IotaAddress,
63 ) -> Result<Option<f64>, IndexerError> {
64 let apys = self
65 .validators_apys_map(self.get_validators_apy().await?)
66 .await;
67 Ok(apys.get(address).copied())
68 }
69
70 async fn get_validators_apy(&self) -> Result<ValidatorApys, IndexerError> {
71 let system_state_summary = self.get_latest_iota_system_state().await?;
72 let epoch = system_state_summary.epoch();
73
74 let exchange_rate_table = self.exchange_rates(&system_state_summary).await?;
75
76 let apys = iota_json_rpc::governance_api::calculate_apys(exchange_rate_table);
77
78 Ok(ValidatorApys { apys, epoch })
79 }
80
81 pub async fn get_epoch_info(&self, epoch: Option<EpochId>) -> Result<EpochInfo, IndexerError> {
82 match self
83 .inner
84 .spawn_blocking(move |this| this.get_epoch_info(epoch))
85 .await
86 {
87 Ok(Some(epoch_info)) => Ok(epoch_info),
88 Ok(None) => Err(IndexerError::InvalidArgument(format!(
89 "Missing epoch {epoch:?}"
90 ))),
91 Err(e) => Err(e),
92 }
93 }
94
95 async fn get_latest_iota_system_state(&self) -> Result<IotaSystemStateSummary, IndexerError> {
96 self.inner
97 .spawn_blocking(|this| this.get_latest_iota_system_state())
98 .await
99 }
100
101 async fn get_stakes_by_ids(
102 &self,
103 ids: Vec<ObjectID>,
104 ) -> Result<Vec<DelegatedStake>, IndexerError> {
105 let mut stakes = vec![];
106 for stored_object in self.inner.multi_get_objects_in_blocking_task(ids).await? {
107 let object = iota_types::object::Object::try_from(stored_object)?;
108 let stake_object = StakedIota::try_from(&object)?;
109 stakes.push(stake_object);
110 }
111
112 self.get_delegated_stakes(stakes).await
113 }
114
115 async fn get_staked_by_owner(
116 &self,
117 owner: IotaAddress,
118 ) -> Result<Vec<DelegatedStake>, IndexerError> {
119 let mut stakes = vec![];
120 for stored_object in self
121 .inner
122 .get_owned_objects_in_blocking_task(
123 owner,
124 Some(IotaObjectDataFilter::StructType(
125 MoveObjectType::staked_iota().into(),
126 )),
127 None,
128 MAX_QUERY_STAKED_OBJECTS,
129 )
130 .await?
131 {
132 let object = iota_types::object::Object::try_from(stored_object)?;
133 let stake_object = StakedIota::try_from(&object)?;
134 stakes.push(stake_object);
135 }
136
137 self.get_delegated_stakes(stakes).await
138 }
139
140 async fn get_timelocked_staked_by_owner(
141 &self,
142 owner: IotaAddress,
143 ) -> Result<Vec<DelegatedTimelockedStake>, IndexerError> {
144 let mut stakes = vec![];
145 for stored_object in self
146 .inner
147 .get_owned_objects_in_blocking_task(
148 owner,
149 Some(IotaObjectDataFilter::StructType(
150 MoveObjectType::timelocked_staked_iota().into(),
151 )),
152 None,
153 MAX_QUERY_STAKED_OBJECTS,
154 )
155 .await?
156 {
157 let object = iota_types::object::Object::try_from(stored_object)?;
158 let stake_object = TimelockedStakedIota::try_from(&object)?;
159 stakes.push(stake_object);
160 }
161
162 self.get_delegated_timelocked_stakes(stakes).await
163 }
164
165 pub async fn get_delegated_stakes(
166 &self,
167 stakes: Vec<StakedIota>,
168 ) -> Result<Vec<DelegatedStake>, IndexerError> {
169 let pools = stakes
170 .into_iter()
171 .fold(BTreeMap::<_, Vec<_>>::new(), |mut pools, stake| {
172 pools.entry(stake.pool_id()).or_default().push(stake);
173 pools
174 });
175
176 let system_state_summary = self.get_latest_iota_system_state().await?;
177 let epoch = system_state_summary.epoch();
178
179 let (candidate_rates, pending_rates) = tokio::try_join!(
180 self.candidate_validators_exchange_rate(&system_state_summary),
181 self.pending_validators_exchange_rate()
182 )?;
183
184 let rates = self
185 .exchange_rates(&system_state_summary)
186 .await?
187 .into_iter()
188 .chain(candidate_rates.into_iter())
189 .chain(pending_rates.into_iter())
190 .map(|rates| (rates.pool_id, rates))
191 .collect::<BTreeMap<_, _>>();
192
193 let mut delegated_stakes = vec![];
194 for (pool_id, stakes) in pools {
195 let rate_table = rates.get(&pool_id).ok_or_else(|| {
197 IndexerError::InvalidArgument(format!(
198 "Cannot find rates for staking pool {pool_id}"
199 ))
200 })?;
201 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
202
203 let mut delegations = vec![];
204 for stake in stakes {
205 let status = stake_status(
206 epoch,
207 stake.activation_epoch(),
208 stake.principal(),
209 rate_table,
210 current_rate,
211 );
212
213 delegations.push(iota_json_rpc_types::Stake {
214 staked_iota_id: stake.id(),
215 stake_request_epoch: stake.activation_epoch().saturating_sub(1),
217 stake_active_epoch: stake.activation_epoch(),
218 principal: stake.principal(),
219 status,
220 })
221 }
222 delegated_stakes.push(DelegatedStake {
223 validator_address: rate_table.address,
224 staking_pool: pool_id,
225 stakes: delegations,
226 })
227 }
228 Ok(delegated_stakes)
229 }
230
231 pub async fn get_delegated_timelocked_stakes(
232 &self,
233 stakes: Vec<TimelockedStakedIota>,
234 ) -> Result<Vec<DelegatedTimelockedStake>, IndexerError> {
235 let pools = stakes
236 .into_iter()
237 .fold(BTreeMap::<_, Vec<_>>::new(), |mut pools, stake| {
238 pools.entry(stake.pool_id()).or_default().push(stake);
239 pools
240 });
241
242 let system_state_summary = self.get_latest_iota_system_state().await?;
243 let epoch = system_state_summary.epoch();
244
245 let rates = self
246 .exchange_rates(&system_state_summary)
247 .await?
248 .into_iter()
249 .map(|rates| (rates.pool_id, rates))
250 .collect::<BTreeMap<_, _>>();
251
252 let mut delegated_stakes = vec![];
253 for (pool_id, stakes) in pools {
254 let rate_table = rates.get(&pool_id).ok_or_else(|| {
256 IndexerError::InvalidArgument(format!(
257 "Cannot find rates for staking pool {pool_id}"
258 ))
259 })?;
260 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
261
262 let mut delegations = vec![];
263 for stake in stakes {
264 let status = stake_status(
265 epoch,
266 stake.activation_epoch(),
267 stake.principal(),
268 rate_table,
269 current_rate,
270 );
271
272 delegations.push(iota_json_rpc_types::TimelockedStake {
273 timelocked_staked_iota_id: stake.id(),
274 stake_request_epoch: stake.activation_epoch().saturating_sub(1),
276 stake_active_epoch: stake.activation_epoch(),
277 principal: stake.principal(),
278 status,
279 expiration_timestamp_ms: stake.expiration_timestamp_ms(),
280 label: stake.label().clone(),
281 })
282 }
283 delegated_stakes.push(DelegatedTimelockedStake {
284 validator_address: rate_table.address,
285 staking_pool: pool_id,
286 stakes: delegations,
287 })
288 }
289 Ok(delegated_stakes)
290 }
291
292 async fn validators_apys_map(&self, apys: ValidatorApys) -> BTreeMap<IotaAddress, f64> {
294 if let Some(cached_apys) = self
296 .validators_apys_cache
297 .lock()
298 .await
299 .cache_get(&apys.epoch)
300 {
301 return cached_apys.clone();
302 }
303
304 let ret = BTreeMap::from_iter(apys.apys.iter().map(|x| (x.address, x.apy)));
305 self.validators_apys_cache
307 .lock()
308 .await
309 .cache_set(apys.epoch, ret.clone());
310
311 ret
312 }
313
314 async fn validator_exchange_rates(
316 &self,
317 tables: Vec<ValidatorTable>,
318 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
319 if tables.is_empty() {
320 return Ok(vec![]);
321 };
322
323 let mut exchange_rates = vec![];
324 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
326 let mut rates = vec![];
327 for df in self
328 .inner
329 .get_dynamic_fields_raw_in_blocking_task(
330 exchange_rates_id,
331 None,
332 exchange_rates_size as usize,
333 )
334 .await?
335 {
336 let dynamic_field = df
337 .to_dynamic_field::<EpochId, PoolTokenExchangeRate>()
338 .ok_or_else(|| iota_types::error::IotaError::ObjectDeserialization {
339 error: "dynamic field malformed".to_owned(),
340 })?;
341
342 rates.push((dynamic_field.name, dynamic_field.value));
343 }
344
345 rates = backfill_rates(rates);
348
349 exchange_rates.push(ValidatorExchangeRates {
350 address,
351 pool_id,
352 active,
353 rates,
354 });
355 }
356 Ok(exchange_rates)
357 }
358
359 pub async fn exchange_rates(
363 &self,
364 system_state_summary: &IotaSystemStateSummary,
365 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
366 let epoch = system_state_summary.epoch();
367
368 let mut cache = self.exchange_rates_cache.lock().await;
369
370 if let Some(cached_rates) = cache.cache_get(&epoch) {
372 return Ok(cached_rates.clone());
373 }
374
375 let exchange_rates = self.compute_exchange_rates(system_state_summary).await?;
377
378 cache.cache_set(epoch, exchange_rates.clone());
380
381 Ok(exchange_rates)
382 }
383
384 async fn compute_exchange_rates(
386 &self,
387 system_state_summary: &IotaSystemStateSummary,
388 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
389 let (active_rates, inactive_rates) = tokio::try_join!(
390 self.active_validators_exchange_rate(system_state_summary),
391 self.inactive_validators_exchange_rate(system_state_summary)
392 )?;
393
394 Ok(active_rates
395 .into_iter()
396 .chain(inactive_rates.into_iter())
397 .collect())
398 }
399
400 async fn active_validators_exchange_rate(
402 &self,
403 system_state_summary: &IotaSystemStateSummary,
404 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
405 let tables = system_state_summary
406 .active_validators()
407 .iter()
408 .map(|validator| {
409 (
410 validator.iota_address,
411 validator.staking_pool_id,
412 validator.exchange_rates_id,
413 validator.exchange_rates_size,
414 true,
415 )
416 })
417 .collect();
418
419 self.validator_exchange_rates(tables).await
420 }
421
422 async fn inactive_validators_exchange_rate(
424 &self,
425 system_state_summary: &IotaSystemStateSummary,
426 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
427 let tables = self
428 .validator_summary_from_system_state(
429 system_state_summary.inactive_pools_id(),
430 system_state_summary.inactive_pools_size(),
431 |df| bcs::from_bytes::<ID>(&df.bcs_name).map_err(Into::into),
432 )
433 .await?;
434
435 self.validator_exchange_rates(tables).await
436 }
437
438 pub async fn pending_validators_exchange_rate(
444 &self,
445 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
446 let tables = self
448 .inner
449 .pending_active_validators()
450 .await?
451 .into_iter()
452 .map(|pending_active_validator| {
453 (
454 pending_active_validator.iota_address,
455 pending_active_validator.staking_pool_id,
456 pending_active_validator.exchange_rates_id,
457 pending_active_validator.exchange_rates_size,
458 false,
459 )
460 })
461 .collect::<Vec<ValidatorTable>>();
462
463 self.validator_exchange_rates(tables).await
464 }
465
466 pub async fn candidate_validators_exchange_rate(
472 &self,
473 system_state_summary: &IotaSystemStateSummary,
474 ) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
475 let tables = self
476 .validator_summary_from_system_state(
477 system_state_summary.validator_candidates_id(),
478 system_state_summary.validator_candidates_size(),
479 |df| bcs::from_bytes::<IotaAddress>(&df.bcs_name).map_err(Into::into),
480 )
481 .await?;
482
483 self.validator_exchange_rates(tables).await
484 }
485
486 async fn validator_summary_from_system_state<K, F>(
532 &self,
533 table_id: ObjectID,
534 validator_size: u64,
535 key: F,
536 ) -> Result<Vec<ValidatorTable>, IndexerError>
537 where
538 F: Fn(DynamicFieldInfo) -> Result<K, IndexerError>,
539 K: MoveTypeTagTrait + Serialize + DeserializeOwned + Debug + Send + 'static,
540 {
541 let dynamic_fields = self
542 .inner
543 .get_dynamic_fields_in_blocking_task(table_id, None, validator_size as usize)
544 .await?;
545
546 let mut tables = Vec::with_capacity(dynamic_fields.len());
547
548 for df in dynamic_fields {
549 let key = key(df)?;
550 let validator_candidate = self
551 .inner
552 .spawn_blocking(move |this| {
553 iota_types::iota_system_state::get_validator_from_table(&this, table_id, &key)
554 })
555 .await?;
556
557 tables.push((
558 validator_candidate.iota_address,
559 validator_candidate.staking_pool_id,
560 validator_candidate.exchange_rates_id,
561 validator_candidate.exchange_rates_size,
562 false,
563 ));
564 }
565
566 Ok(tables)
567 }
568}
569
570fn backfill_rates(
574 mut rates: Vec<(EpochId, PoolTokenExchangeRate)>,
575) -> Vec<(EpochId, PoolTokenExchangeRate)> {
576 if rates.is_empty() {
577 return rates;
578 }
579 rates.sort_unstable_by_key(|(epoch_id, _)| *epoch_id);
581
582 let (min_epoch, _) = rates.first().expect("rates should not be empty");
584 let (max_epoch, _) = rates.last().expect("rates should not be empty");
585 let expected_len = (max_epoch - min_epoch + 1) as usize;
586 let current_len = rates.len();
587
588 if current_len == expected_len {
590 rates.reverse();
591 return rates;
592 }
593
594 let mut filled_rates: Vec<(EpochId, PoolTokenExchangeRate)> = Vec::with_capacity(expected_len);
595 let mut missing_rates = Vec::with_capacity(expected_len - current_len);
596 for (epoch_id, rate) in rates {
597 if let Some((prev_epoch_id, prev_rate)) = filled_rates.last() {
599 for missing_epoch_id in prev_epoch_id + 1..epoch_id {
600 missing_rates.push((missing_epoch_id, prev_rate.clone()));
601 }
602 };
603
604 filled_rates.append(&mut missing_rates);
609 filled_rates.push((epoch_id, rate));
610 }
611 filled_rates.reverse();
612 filled_rates
613}
614
615fn stake_status(
616 epoch: u64,
617 activation_epoch: u64,
618 principal: u64,
619 rate_table: &ValidatorExchangeRates,
620 current_rate: Option<&PoolTokenExchangeRate>,
621) -> StakeStatus {
622 if epoch >= activation_epoch {
623 let estimated_reward = if let Some(current_rate) = current_rate {
624 let stake_rate = rate_table
625 .rates
626 .iter()
627 .find_map(|(epoch, rate)| (*epoch == activation_epoch).then(|| rate.clone()))
628 .unwrap_or_default();
629 let estimated_reward =
630 ((stake_rate.rate() / current_rate.rate()) - 1.0) * principal as f64;
631 std::cmp::max(0, estimated_reward.round() as u64)
632 } else {
633 0
634 };
635 StakeStatus::Active { estimated_reward }
636 } else {
637 StakeStatus::Pending
638 }
639}
640
641#[async_trait]
642impl GovernanceReadApiServer for GovernanceReadApi {
643 async fn get_stakes_by_ids(
644 &self,
645 staked_iota_ids: Vec<ObjectID>,
646 ) -> RpcResult<Vec<DelegatedStake>> {
647 self.get_stakes_by_ids(staked_iota_ids)
648 .await
649 .map_err(Into::into)
650 }
651
652 async fn get_stakes(&self, owner: IotaAddress) -> RpcResult<Vec<DelegatedStake>> {
653 self.get_staked_by_owner(owner).await.map_err(Into::into)
654 }
655
656 async fn get_timelocked_stakes_by_ids(
657 &self,
658 timelocked_staked_iota_ids: Vec<ObjectID>,
659 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
660 let stakes = self
661 .inner
662 .multi_get_objects_in_blocking_task(timelocked_staked_iota_ids)
663 .await?
664 .into_iter()
665 .map(|stored_object| {
666 let object = iota_types::object::Object::try_from(stored_object)?;
667 TimelockedStakedIota::try_from(&object).map_err(IndexerError::from)
668 })
669 .collect::<Result<Vec<_>, _>>()?;
670
671 self.get_delegated_timelocked_stakes(stakes)
672 .await
673 .map_err(Into::into)
674 }
675
676 async fn get_timelocked_stakes(
677 &self,
678 owner: IotaAddress,
679 ) -> RpcResult<Vec<DelegatedTimelockedStake>> {
680 self.get_timelocked_staked_by_owner(owner)
681 .await
682 .map_err(Into::into)
683 }
684
685 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<IotaCommittee> {
686 let epoch = self.get_epoch_info(epoch.as_deref().copied()).await?;
687 Ok(epoch.committee().map_err(IndexerError::from)?.into())
688 }
689
690 async fn get_latest_iota_system_state_v2(&self) -> RpcResult<IotaSystemStateSummary> {
691 Ok(self.get_latest_iota_system_state().await?)
692 }
693
694 async fn get_latest_iota_system_state(&self) -> RpcResult<IotaSystemStateSummaryV1> {
695 Ok(self
696 .get_latest_iota_system_state()
697 .await?
698 .try_into()
699 .map_err(IndexerError::from)?)
700 }
701
702 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
703 let epoch = self.get_epoch_info(None).await?;
704 Ok(BigInt::from(epoch.reference_gas_price.ok_or_else(
705 || {
706 IndexerError::PersistentStorageDataCorruption(
707 "missing latest reference gas price".to_owned(),
708 )
709 },
710 )?))
711 }
712
713 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
714 Ok(self.get_validators_apy().await?)
715 }
716}
717
718impl IotaRpcModule for GovernanceReadApi {
719 fn rpc(self) -> RpcModule<Self> {
720 self.into_rpc()
721 }
722
723 fn rpc_doc_module() -> Module {
724 iota_json_rpc_api::GovernanceReadApiOpenRpc::module_doc()
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use iota_types::iota_system_state::PoolTokenExchangeRate;
731
732 use super::*;
733 #[test]
734 fn test_backfill_rates_empty() {
735 let rates = vec![];
736 assert_eq!(backfill_rates(rates), vec![]);
737 }
738
739 #[test]
740 fn test_backfill_single_rate() {
741 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
742 let rates = vec![(1, rate1.clone())];
743 let expected = vec![(1, rate1)];
744 assert_eq!(backfill_rates(rates), expected);
745 }
746
747 #[test]
748 fn test_backfill_rates_no_gaps() {
749 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
750 let rate2 = PoolTokenExchangeRate::new_for_testing(200, 220);
751 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
752 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
753 let expected: Vec<(u64, PoolTokenExchangeRate)> =
754 vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
755 assert_eq!(backfill_rates(rates), expected);
756 }
757
758 #[test]
759 fn test_backfill_rates_with_gaps() {
760 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
761 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
762 let rate5 = PoolTokenExchangeRate::new_for_testing(500, 550);
763 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
764 let expected = vec![
765 (5, rate5.clone()),
766 (4, rate3.clone()),
767 (3, rate3.clone()),
768 (2, rate1.clone()),
769 (1, rate1),
770 ];
771 assert_eq!(backfill_rates(rates), expected);
772 }
773
774 #[test]
775 fn test_backfill_rates_missing_middle_epoch() {
776 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
777 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
778 let rates = vec![(1, rate1.clone()), (3, rate3.clone())];
779 let expected = vec![(3, rate3), (2, rate1.clone()), (1, rate1)];
780 assert_eq!(backfill_rates(rates), expected);
781 }
782
783 #[test]
784 fn test_backfill_rates_missing_middle_epochs() {
785 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
786 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
787 let rates = vec![(1, rate1.clone()), (4, rate4.clone())];
788 let expected = vec![
789 (4, rate4),
790 (3, rate1.clone()),
791 (2, rate1.clone()),
792 (1, rate1),
793 ];
794 assert_eq!(backfill_rates(rates), expected);
795 }
796
797 #[test]
798 fn test_backfill_rates_unordered_input() {
799 let rate1 = PoolTokenExchangeRate::new_for_testing(100, 100);
800 let rate3 = PoolTokenExchangeRate::new_for_testing(300, 330);
801 let rate4 = PoolTokenExchangeRate::new_for_testing(400, 440);
802 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (4, rate4.clone())];
803 let expected = vec![(4, rate4), (3, rate3), (2, rate1.clone()), (1, rate1)];
804 assert_eq!(backfill_rates(rates), expected);
805 }
806}