iota_authority_aggregation/
lib.rs1use std::{
6 collections::{BTreeMap, BTreeSet},
7 sync::Arc,
8 time::Duration,
9};
10
11use futures::{Future, StreamExt, future::BoxFuture, stream::FuturesUnordered};
12use iota_metrics::monitored_future;
13use iota_types::{
14 base_types::ConciseableName,
15 committee::{CommitteeTrait, StakeUnit},
16};
17use tokio::time::timeout;
18
19pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
20
21pub enum ReduceOutput<R, S> {
22 Continue(S),
23 Failed(S),
24 Success(R),
25}
26
27pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
28 'a,
29 C,
30 K,
31 Client: 'a,
32 S,
33 V,
34 R,
35 E,
36 FMap,
37 FReduce,
38>(
39 committee: Arc<C>,
40 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
41 authority_preferences: Option<&BTreeSet<K>>,
42 initial_state: S,
43 map_each_authority: FMap,
44 reduce_result: FReduce,
45 initial_timeout: Duration,
46) -> Result<
47 (
48 R,
49 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
50 ),
51 S,
52>
53where
54 K: Ord + ConciseableName<'a> + Clone + 'a,
55 C: CommitteeTrait<K>,
56 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
57 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
58{
59 let authorities_shuffled = committee.shuffle_by_stake(authority_preferences, None);
60
61 let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
63 .into_iter()
64 .map(|name| {
65 let client = authority_clients[&name].clone();
66 let execute = map_each_authority.clone();
67 monitored_future!(async move { (name.clone(), execute(name, client).await,) })
68 })
69 .collect();
70
71 let current_timeout = initial_timeout;
72 let mut accumulated_state = initial_state;
73 while let Ok(Some((authority_name, result))) = timeout(current_timeout, responses.next()).await
75 {
76 let authority_weight = committee.weight(&authority_name);
77 accumulated_state =
78 match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
79 ReduceOutput::Continue(state) => state,
81 ReduceOutput::Failed(state) => {
82 return Err(state);
83 }
84 ReduceOutput::Success(result) => {
85 return Ok((result, responses));
87 }
88 }
89 }
90 Err(accumulated_state)
93}
94
95pub async fn quorum_map_then_reduce_with_timeout<
114 'a,
115 C,
116 K,
117 Client: 'a,
118 S: 'a,
119 V: 'a,
120 R: 'a,
121 E,
122 FMap,
123 FReduce,
124>(
125 committee: Arc<C>,
126 authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
127 initial_state: S,
129 map_each_authority: FMap,
132 reduce_result: FReduce,
135 initial_timeout: Duration,
137) -> Result<
138 (
139 R,
140 FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
141 ),
142 S,
143>
144where
145 K: Ord + ConciseableName<'a> + Clone + 'a,
146 C: CommitteeTrait<K>,
147 FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
148 FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
149{
150 quorum_map_then_reduce_with_timeout_and_prefs(
151 committee,
152 authority_clients,
153 None,
154 initial_state,
155 map_each_authority,
156 reduce_result,
157 initial_timeout,
158 )
159 .await
160}