iota_authority_aggregation/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    sync::Arc,
8    time::Duration,
9};
10
11use futures::{Future, StreamExt, future::BoxFuture, stream::FuturesUnordered};
12use iota_metrics::monitored_future;
13use iota_types::{
14    base_types::ConciseableName,
15    committee::{CommitteeTrait, StakeUnit},
16};
17use tokio::time::timeout;
18
19pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
20
21pub enum ReduceOutput<R, S> {
22    Continue(S),
23    Failed(S),
24    Success(R),
25}
26
27pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
28    'a,
29    C,
30    K,
31    Client: 'a,
32    S,
33    V,
34    R,
35    E,
36    FMap,
37    FReduce,
38>(
39    committee: Arc<C>,
40    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
41    authority_preferences: Option<&BTreeSet<K>>,
42    initial_state: S,
43    map_each_authority: FMap,
44    reduce_result: FReduce,
45    initial_timeout: Duration,
46) -> Result<
47    (
48        R,
49        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
50    ),
51    S,
52>
53where
54    K: Ord + ConciseableName<'a> + Clone + 'a,
55    C: CommitteeTrait<K>,
56    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
57    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
58{
59    let authorities_shuffled = committee.shuffle_by_stake(authority_preferences, None);
60
61    // First, execute in parallel for each authority FMap.
62    let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
63        .into_iter()
64        .map(|name| {
65            let client = authority_clients[&name].clone();
66            let execute = map_each_authority.clone();
67            monitored_future!(async move { (name.clone(), execute(name, client).await,) })
68        })
69        .collect();
70
71    let current_timeout = initial_timeout;
72    let mut accumulated_state = initial_state;
73    // Then, as results become available fold them into the state using FReduce.
74    while let Ok(Some((authority_name, result))) = timeout(current_timeout, responses.next()).await
75    {
76        let authority_weight = committee.weight(&authority_name);
77        accumulated_state =
78            match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
79                // In the first two cases we are told to continue the iteration.
80                ReduceOutput::Continue(state) => state,
81                ReduceOutput::Failed(state) => {
82                    return Err(state);
83                }
84                ReduceOutput::Success(result) => {
85                    // The reducer tells us that we have the result needed. Just return it.
86                    return Ok((result, responses));
87                }
88            }
89    }
90    // If we have exhausted all authorities and still have not returned a result,
91    // return error with the accumulated state.
92    Err(accumulated_state)
93}
94
95/// This function takes an initial state, than executes an asynchronous function
96/// (FMap) for each authority, and folds the results as they become available
97/// into the state using an async function (FReduce).
98///
99/// FMap can do io, and returns a result V. An error there may not be fatal, and
100/// could be consumed by the MReduce function to overall recover from it. This
101/// is necessary to ensure byzantine authorities cannot interrupt the logic of
102/// this function.
103///
104/// FReduce returns a result to a ReduceOutput. If the result is Err the
105/// function shortcuts and the Err is returned. An Ok ReduceOutput result can be
106/// used to shortcut and return the resulting state (ReduceOutput::End),
107/// continue the folding as new states arrive (ReduceOutput::Continue).
108///
109/// This function provides a flexible way to communicate with a quorum of
110/// authorities, processing and processing their results into a safe overall
111/// result, and also safely allowing operations to continue past the quorum to
112/// ensure all authorities are up to date (up to a timeout).
113pub async fn quorum_map_then_reduce_with_timeout<
114    'a,
115    C,
116    K,
117    Client: 'a,
118    S: 'a,
119    V: 'a,
120    R: 'a,
121    E,
122    FMap,
123    FReduce,
124>(
125    committee: Arc<C>,
126    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
127    // The initial state that will be used to fold in values from authorities.
128    initial_state: S,
129    // The async function used to apply to each authority. It takes an authority name,
130    // and authority client parameter and returns a Result<V>.
131    map_each_authority: FMap,
132    // The async function that takes an accumulated state, and a new result for V from an
133    // authority and returns a result to a ReduceOutput state.
134    reduce_result: FReduce,
135    // The initial timeout applied to all
136    initial_timeout: Duration,
137) -> Result<
138    (
139        R,
140        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
141    ),
142    S,
143>
144where
145    K: Ord + ConciseableName<'a> + Clone + 'a,
146    C: CommitteeTrait<K>,
147    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
148    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
149{
150    quorum_map_then_reduce_with_timeout_and_prefs(
151        committee,
152        authority_clients,
153        None,
154        initial_state,
155        map_each_authority,
156        reduce_result,
157        initial_timeout,
158    )
159    .await
160}