iota_authority_aggregation/
lib.rs1use std::{
6    collections::{BTreeMap, BTreeSet},
7    sync::Arc,
8    time::Duration,
9};
10
11use futures::{Future, StreamExt, future::BoxFuture, stream::FuturesUnordered};
12use iota_metrics::monitored_future;
13use iota_types::{
14    base_types::ConciseableName,
15    committee::{CommitteeTrait, StakeUnit},
16};
17use tokio::time::timeout;
18
19pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
20
21pub enum ReduceOutput<R, S> {
22    Continue(S),
23    Failed(S),
24    Success(R),
25}
26
27pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
28    'a,
29    C,
30    K,
31    Client: 'a,
32    S,
33    V,
34    R,
35    E,
36    FMap,
37    FReduce,
38>(
39    committee: Arc<C>,
40    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
41    authority_preferences: Option<&BTreeSet<K>>,
42    initial_state: S,
43    map_each_authority: FMap,
44    reduce_result: FReduce,
45    initial_timeout: Duration,
46) -> Result<
47    (
48        R,
49        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
50    ),
51    S,
52>
53where
54    K: Ord + ConciseableName<'a> + Clone + 'a,
55    C: CommitteeTrait<K>,
56    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
57    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
58{
59    let authorities_shuffled = committee.shuffle_by_stake(authority_preferences, None);
60
61    let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
63        .into_iter()
64        .map(|name| {
65            let client = authority_clients[&name].clone();
66            let execute = map_each_authority.clone();
67            monitored_future!(async move { (name.clone(), execute(name, client).await,) })
68        })
69        .collect();
70
71    let current_timeout = initial_timeout;
72    let mut accumulated_state = initial_state;
73    while let Ok(Some((authority_name, result))) = timeout(current_timeout, responses.next()).await
75    {
76        let authority_weight = committee.weight(&authority_name);
77        accumulated_state =
78            match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
79                ReduceOutput::Continue(state) => state,
81                ReduceOutput::Failed(state) => {
82                    return Err(state);
83                }
84                ReduceOutput::Success(result) => {
85                    return Ok((result, responses));
87                }
88            }
89    }
90    Err(accumulated_state)
93}
94
95pub async fn quorum_map_then_reduce_with_timeout<
114    'a,
115    C,
116    K,
117    Client: 'a,
118    S: 'a,
119    V: 'a,
120    R: 'a,
121    E,
122    FMap,
123    FReduce,
124>(
125    committee: Arc<C>,
126    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
127    initial_state: S,
129    map_each_authority: FMap,
132    reduce_result: FReduce,
135    initial_timeout: Duration,
137) -> Result<
138    (
139        R,
140        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
141    ),
142    S,
143>
144where
145    K: Ord + ConciseableName<'a> + Clone + 'a,
146    C: CommitteeTrait<K>,
147    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
148    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
149{
150    quorum_map_then_reduce_with_timeout_and_prefs(
151        committee,
152        authority_clients,
153        None,
154        initial_state,
155        map_each_authority,
156        reduce_result,
157        initial_timeout,
158    )
159    .await
160}