1use std::{
6 collections::HashMap,
7 sync::{Arc, RwLock},
8 time::Duration,
9};
10
11use anyhow::Result;
12use bcs;
13use fastcrypto::{ed25519::Ed25519PublicKey, traits::ToFromBytes};
14use iota_sdk::{IotaClient, IotaClientBuilder, rpc_types::IotaObjectDataOptions};
15use iota_tls::Allower;
16use iota_types::{
17 base_types::ObjectID,
18 dynamic_field::Field,
19 iota_system_state::{
20 iota_system_state_inner_v1::ValidatorV1,
21 iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
22 },
23};
24use itertools::Itertools;
25use tracing::{debug, error, info};
26
27pub type AllowedPeers = Arc<RwLock<HashMap<Ed25519PublicKey, AllowedPeer>>>;
29
30#[derive(Hash, PartialEq, Eq, Debug, Clone)]
31pub struct AllowedPeer {
32 pub name: String,
33 pub public_key: Ed25519PublicKey,
34}
35
36#[derive(Debug, Clone)]
43pub struct IotaNodeProvider {
44 active_validator_nodes: AllowedPeers,
45 pending_validator_nodes: AllowedPeers,
46 static_nodes: AllowedPeers,
47 rpc_url: String,
48 rpc_poll_interval: Duration,
49}
50
51impl Allower for IotaNodeProvider {
52 fn allowed(&self, key: &Ed25519PublicKey) -> bool {
53 self.static_nodes.read().unwrap().contains_key(key)
54 || self
55 .active_validator_nodes
56 .read()
57 .unwrap()
58 .contains_key(key)
59 || self
60 .pending_validator_nodes
61 .read()
62 .unwrap()
63 .contains_key(key)
64 }
65}
66
67impl IotaNodeProvider {
68 pub fn new(
69 rpc_url: String,
70 rpc_poll_interval: Duration,
71 static_peers: Vec<AllowedPeer>,
72 ) -> Self {
73 let static_nodes: HashMap<Ed25519PublicKey, AllowedPeer> = static_peers
76 .into_iter()
77 .map(|v| (v.public_key.clone(), v))
78 .collect();
79 let static_nodes = Arc::new(RwLock::new(static_nodes));
80 let active_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
81 let pending_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
82 Self {
83 active_validator_nodes,
84 pending_validator_nodes,
85 static_nodes,
86 rpc_url,
87 rpc_poll_interval,
88 }
89 }
90
91 pub fn get(&self, key: &Ed25519PublicKey) -> Option<AllowedPeer> {
93 debug!("look for {:?}", key);
94 if let Some(v) = self.static_nodes.read().unwrap().get(key) {
96 return Some(AllowedPeer {
97 name: v.name.to_owned(),
98 public_key: v.public_key.to_owned(),
99 });
100 }
101 if let Some(v) = self.active_validator_nodes.read().unwrap().get(key) {
103 return Some(AllowedPeer {
104 name: v.name.to_owned(),
105 public_key: v.public_key.to_owned(),
106 });
107 }
108 if let Some(v) = self.pending_validator_nodes.read().unwrap().get(key) {
110 return Some(AllowedPeer {
111 name: v.name.to_owned(),
112 public_key: v.public_key.to_owned(),
113 });
114 }
115 None
116 }
117
118 pub fn get_mut(&mut self) -> &mut AllowedPeers {
120 &mut self.active_validator_nodes
121 }
122
123 fn update_active_validator_set(&self, summary: &IotaSystemStateSummary) {
125 let active_validator_summaries = summary
126 .iter_active_validators()
127 .cloned()
128 .collect::<Vec<IotaValidatorSummary>>();
129
130 let active_validators = extract_validators_from_summaries(&active_validator_summaries);
133 let mut allow = self.active_validator_nodes.write().unwrap();
134 allow.clear();
135 allow.extend(active_validators);
136 info!(
137 "{} iota validators managed to make it on the allow list",
138 allow.len()
139 );
140 }
141
142 fn update_pending_validator_set(&self, pending_validators: Vec<ValidatorV1>) {
143 let summaries = pending_validators
144 .into_iter()
145 .map(|v| v.into_iota_validator_summary())
146 .collect_vec();
147 let validators = extract_validators_from_summaries(&summaries);
148 let mut allow = self.pending_validator_nodes.write().unwrap();
149 allow.clear();
150 allow.extend(validators);
151 info!(
152 "{} iota pending validators managed to make it on the allow list",
153 allow.len()
154 );
155 }
156
157 async fn get_pending_validators(
158 iota_client: &IotaClient,
159 pending_active_validators_id: ObjectID,
160 ) -> Result<Vec<ValidatorV1>> {
161 let pending_validators_ids = iota_client
162 .read_api()
163 .get_dynamic_fields(pending_active_validators_id, None, None)
164 .await?
165 .data
166 .into_iter()
167 .map(|dyi| dyi.object_id)
168 .collect::<Vec<_>>();
169
170 let responses = iota_client
171 .read_api()
172 .multi_get_object_with_options(
173 pending_validators_ids,
174 IotaObjectDataOptions::default().with_bcs(),
175 )
176 .await?;
177
178 responses
179 .into_iter()
180 .map(|resp| {
181 let object_id = resp.object_id()?;
182 let bcs = resp.move_object_bcs().ok_or_else(|| {
183 anyhow::anyhow!(
184 "Object {object_id} does not exist or does not return bcs bytes",
185 )
186 })?;
187 let field = bcs::from_bytes::<Field<u64, ValidatorV1>>(bcs).map_err(|e| {
188 anyhow::anyhow!(
189 "Can't convert bcs bytes of object {object_id} to Field<u64, ValidatorV1>: {e}",
190 )
191 })?;
192
193 Ok(field.value)
194 })
195 .collect()
196 }
197
198 pub fn poll_peer_list(&self) {
200 info!("Started polling for peers using rpc: {}", self.rpc_url);
201
202 let rpc_poll_interval = self.rpc_poll_interval;
203 let cloned_self = self.clone();
204 tokio::spawn(async move {
205 let mut interval = tokio::time::interval(rpc_poll_interval);
206 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
207
208 loop {
209 interval.tick().await;
210
211 match IotaClientBuilder::default()
212 .build(&cloned_self.rpc_url)
213 .await
214 {
215 Ok(client) => {
216 match client.governance_api().get_latest_iota_system_state().await {
217 Ok(system_state) => {
218 cloned_self.update_active_validator_set(&system_state);
219 info!("Successfully updated active validators");
220
221 let pending_active_validators_id = match &system_state {
222 IotaSystemStateSummary::V1(system_state) => {
223 system_state.pending_active_validators_id
224 }
225 IotaSystemStateSummary::V2(system_state) => {
226 system_state.pending_active_validators_id
227 }
228 _ => panic!("unsupported IotaSystemStateSummary"),
229 };
230
231 match Self::get_pending_validators(
232 &client,
233 pending_active_validators_id,
234 )
235 .await
236 {
237 Ok(pending_validators) => {
238 cloned_self
239 .update_pending_validator_set(pending_validators);
240 info!("Successfully updated pending validators");
241 }
242 Err(e) => {
243 error!("Failed to get pending validators: {:?}", e);
244 }
245 }
246 }
247 Err(e) => {
248 error!("Failed to get latest iota system state: {:?}", e);
249 }
250 }
251 }
252 Err(e) => {
253 error!("Failed to create IotaClient: {:?}", e);
254 }
255 }
256 }
257 });
258 }
259}
260
261fn extract_validators_from_summaries(
266 validator_summaries: &[IotaValidatorSummary],
267) -> impl Iterator<Item = (Ed25519PublicKey, AllowedPeer)> + use<'_> {
268 validator_summaries.iter().filter_map(|vm| {
269 match Ed25519PublicKey::from_bytes(&vm.network_pubkey_bytes) {
270 Ok(public_key) => {
271 debug!(
272 "adding public key {:?} for iota validator {:?}",
273 public_key, vm.name
274 );
275 Some((
276 public_key.clone(),
277 AllowedPeer {
278 name: vm.name.to_owned(),
279 public_key,
280 },
281 )) }
283 Err(error) => {
284 error!(
285 "unable to decode public key for name: {:?} iota_address: {:?} error: {error}",
286 vm.name, vm.iota_address
287 );
288 None }
290 }
291 })
292}
293
294#[cfg(test)]
295mod tests {
296 use iota_types::iota_system_state::iota_system_state_summary::IotaValidatorSummary;
297 use multiaddr::Multiaddr;
298
299 use super::*;
300 use crate::admin::{CertKeyPair, generate_self_cert};
301 #[test]
302 fn extract_validators_from_summary() {
303 let CertKeyPair(_, client_pub_key) = generate_self_cert("iota".into());
304 let p2p_address: Multiaddr = "/ip4/127.0.0.1/tcp/10000"
305 .parse()
306 .expect("expected a multiaddr value");
307 let summaries = vec![IotaValidatorSummary {
308 network_pubkey_bytes: Vec::from(client_pub_key.as_bytes()),
309 p2p_address: format!("{p2p_address}"),
310 primary_address: "empty".into(),
311 ..Default::default()
312 }];
313 let peers = extract_validators_from_summaries(&summaries);
314 assert_eq!(peers.count(), 1, "peers should have been a length of 1");
315 }
316}