iota_proxy/
peers.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, RwLock},
8    time::Duration,
9};
10
11use anyhow::Result;
12use bcs;
13use fastcrypto::{ed25519::Ed25519PublicKey, traits::ToFromBytes};
14use iota_sdk::{IotaClient, IotaClientBuilder, rpc_types::IotaObjectDataOptions};
15use iota_tls::Allower;
16use iota_types::{
17    base_types::ObjectID,
18    dynamic_field::Field,
19    iota_system_state::{
20        iota_system_state_inner_v1::ValidatorV1,
21        iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
22    },
23};
24use itertools::Itertools;
25use tracing::{debug, error, info};
26
27/// IotaPeers is a mapping of public key to IotaPeer data
28pub type IotaPeers = Arc<RwLock<HashMap<Ed25519PublicKey, IotaPeer>>>;
29
30#[derive(Hash, PartialEq, Eq, Debug, Clone)]
31pub struct IotaPeer {
32    pub name: String,
33    pub public_key: Ed25519PublicKey,
34}
35
36/// IotaNodeProvider queries the iota blockchain and keeps a record of known
37/// validators based on the response from iota_getValidators.  The node name,
38/// public key and other info is extracted from the chain and stored in this
39/// data structure.  We pass this struct to the tls verifier and it depends on
40/// the state contained within. Handlers also use this data in an Extractor
41/// extension to check incoming clients on the http api against known keys.
42#[derive(Debug, Clone)]
43pub struct IotaNodeProvider {
44    active_validator_nodes: IotaPeers,
45    pending_validator_nodes: IotaPeers,
46    static_nodes: IotaPeers,
47    rpc_url: String,
48    rpc_poll_interval: Duration,
49}
50
51impl Allower for IotaNodeProvider {
52    fn allowed(&self, key: &Ed25519PublicKey) -> bool {
53        self.static_nodes.read().unwrap().contains_key(key)
54            || self
55                .active_validator_nodes
56                .read()
57                .unwrap()
58                .contains_key(key)
59            || self
60                .pending_validator_nodes
61                .read()
62                .unwrap()
63                .contains_key(key)
64    }
65}
66
67impl IotaNodeProvider {
68    pub fn new(rpc_url: String, rpc_poll_interval: Duration, static_peers: Vec<IotaPeer>) -> Self {
69        // build our hashmap with the static pub keys. we only do this one time at
70        // binary startup.
71        let static_nodes: HashMap<Ed25519PublicKey, IotaPeer> = static_peers
72            .into_iter()
73            .map(|v| (v.public_key.clone(), v))
74            .collect();
75        let static_nodes = Arc::new(RwLock::new(static_nodes));
76        let active_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
77        let pending_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
78        Self {
79            active_validator_nodes,
80            pending_validator_nodes,
81            static_nodes,
82            rpc_url,
83            rpc_poll_interval,
84        }
85    }
86
87    /// get is used to retrieve peer info in our handlers
88    pub fn get(&self, key: &Ed25519PublicKey) -> Option<IotaPeer> {
89        debug!("look for {:?}", key);
90        // check static nodes first
91        if let Some(v) = self.static_nodes.read().unwrap().get(key) {
92            return Some(IotaPeer {
93                name: v.name.to_owned(),
94                public_key: v.public_key.to_owned(),
95            });
96        }
97        // check active validators
98        if let Some(v) = self.active_validator_nodes.read().unwrap().get(key) {
99            return Some(IotaPeer {
100                name: v.name.to_owned(),
101                public_key: v.public_key.to_owned(),
102            });
103        }
104        // check pending validators
105        if let Some(v) = self.pending_validator_nodes.read().unwrap().get(key) {
106            return Some(IotaPeer {
107                name: v.name.to_owned(),
108                public_key: v.public_key.to_owned(),
109            });
110        }
111        None
112    }
113
114    /// Get a mutable reference to the allowed validator map
115    pub fn get_mut(&mut self) -> &mut IotaPeers {
116        &mut self.active_validator_nodes
117    }
118
119    /// Here we allow all active validators to be added to the allow list.
120    fn update_active_validator_set(&self, summary: &IotaSystemStateSummary) {
121        let active_validator_summaries = summary
122            .iter_active_validators()
123            .cloned()
124            .collect::<Vec<IotaValidatorSummary>>();
125
126        // Here we allow all active validators to be added to the allow list to make it
127        // more flexible.
128        let active_validators = extract_validators_from_summaries(&active_validator_summaries);
129        let mut allow = self.active_validator_nodes.write().unwrap();
130        allow.clear();
131        allow.extend(active_validators);
132        info!(
133            "{} iota validators managed to make it on the allow list",
134            allow.len()
135        );
136    }
137
138    fn update_pending_validator_set(&self, pending_validators: Vec<ValidatorV1>) {
139        let summaries = pending_validators
140            .into_iter()
141            .map(|v| v.into_iota_validator_summary())
142            .collect_vec();
143        let validators = extract_validators_from_summaries(&summaries);
144        let mut allow = self.pending_validator_nodes.write().unwrap();
145        allow.clear();
146        allow.extend(validators);
147        info!(
148            "{} iota pending validators managed to make it on the allow list",
149            allow.len()
150        );
151    }
152
153    async fn get_pending_validators(
154        iota_client: &IotaClient,
155        pending_active_validators_id: ObjectID,
156    ) -> Result<Vec<ValidatorV1>> {
157        let pending_validators_ids = iota_client
158            .read_api()
159            .get_dynamic_fields(pending_active_validators_id, None, None)
160            .await?
161            .data
162            .into_iter()
163            .map(|dyi| dyi.object_id)
164            .collect::<Vec<_>>();
165
166        let responses = iota_client
167            .read_api()
168            .multi_get_object_with_options(
169                pending_validators_ids,
170                IotaObjectDataOptions::default().with_bcs(),
171            )
172            .await?;
173
174        responses
175            .into_iter()
176            .map(|resp| {
177                let object_id = resp.object_id()?;
178                let bcs = resp.move_object_bcs().ok_or_else(|| {
179                    anyhow::anyhow!(
180                        "Object {object_id} does not exist or does not return bcs bytes",
181                    )
182                })?;
183                let field = bcs::from_bytes::<Field<u64, ValidatorV1>>(bcs).map_err(|e| {
184                anyhow::anyhow!(
185                    "Can't convert bcs bytes of object {object_id} to Field<u64, ValidatorV1>: {e}",
186                )
187            })?;
188
189                Ok(field.value)
190            })
191            .collect()
192    }
193
194    /// poll_peer_list will act as a refresh interval for our cache
195    pub fn poll_peer_list(&self) {
196        info!("Started polling for peers using rpc: {}", self.rpc_url);
197
198        let rpc_poll_interval = self.rpc_poll_interval;
199        let cloned_self = self.clone();
200        tokio::spawn(async move {
201            let mut interval = tokio::time::interval(rpc_poll_interval);
202            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
203
204            loop {
205                interval.tick().await;
206
207                match IotaClientBuilder::default()
208                    .build(&cloned_self.rpc_url)
209                    .await
210                {
211                    Ok(client) => {
212                        match client.governance_api().get_latest_iota_system_state().await {
213                            Ok(system_state) => {
214                                cloned_self.update_active_validator_set(&system_state);
215                                info!("Successfully updated active validators");
216
217                                let pending_active_validators_id = match &system_state {
218                                    IotaSystemStateSummary::V1(system_state) => {
219                                        system_state.pending_active_validators_id
220                                    }
221                                    IotaSystemStateSummary::V2(system_state) => {
222                                        system_state.pending_active_validators_id
223                                    }
224                                    _ => panic!("unsupported IotaSystemStateSummary"),
225                                };
226
227                                match Self::get_pending_validators(
228                                    &client,
229                                    pending_active_validators_id,
230                                )
231                                .await
232                                {
233                                    Ok(pending_validators) => {
234                                        cloned_self
235                                            .update_pending_validator_set(pending_validators);
236                                        info!("Successfully updated pending validators");
237                                    }
238                                    Err(e) => {
239                                        error!("Failed to get pending validators: {:?}", e);
240                                    }
241                                }
242                            }
243                            Err(e) => {
244                                error!("Failed to get latest iota system state: {:?}", e);
245                            }
246                        }
247                    }
248                    Err(e) => {
249                        error!("Failed to create IotaClient: {:?}", e);
250                    }
251                }
252            }
253        });
254    }
255}
256
257/// extract_validators_from_summaries will get the network pubkey bytes from a
258/// IotaValidatorSummary type. This type comes from a full node rpc result. The
259/// key here, if extracted successfully, will ultimately be stored in the allow
260/// list and let us communicate with those actual peers via tls.
261fn extract_validators_from_summaries(
262    validator_summaries: &[IotaValidatorSummary],
263) -> impl Iterator<Item = (Ed25519PublicKey, IotaPeer)> + use<'_> {
264    validator_summaries.iter().filter_map(|vm| {
265        match Ed25519PublicKey::from_bytes(&vm.network_pubkey_bytes) {
266            Ok(public_key) => {
267                debug!(
268                    "adding public key {:?} for iota validator {:?}",
269                    public_key, vm.name
270                );
271                Some((
272                    public_key.clone(),
273                    IotaPeer {
274                        name: vm.name.to_owned(),
275                        public_key,
276                    },
277                )) // scoped to filter_map
278            }
279            Err(error) => {
280                error!(
281                    "unable to decode public key for name: {:?} iota_address: {:?} error: {error}",
282                    vm.name, vm.iota_address
283                );
284                None // scoped to filter_map
285            }
286        }
287    })
288}
289
290#[cfg(test)]
291mod tests {
292    use iota_types::iota_system_state::iota_system_state_summary::IotaValidatorSummary;
293    use multiaddr::Multiaddr;
294
295    use super::*;
296    use crate::admin::{CertKeyPair, generate_self_cert};
297    #[test]
298    fn extract_validators_from_summary() {
299        let CertKeyPair(_, client_pub_key) = generate_self_cert("iota".into());
300        let p2p_address: Multiaddr = "/ip4/127.0.0.1/tcp/10000"
301            .parse()
302            .expect("expected a multiaddr value");
303        let summaries = vec![IotaValidatorSummary {
304            network_pubkey_bytes: Vec::from(client_pub_key.as_bytes()),
305            p2p_address: format!("{p2p_address}"),
306            primary_address: "empty".into(),
307            ..Default::default()
308        }];
309        let peers = extract_validators_from_summaries(&summaries);
310        assert_eq!(peers.count(), 1, "peers should have been a length of 1");
311    }
312}