1use std::{
6 collections::HashMap,
7 sync::{Arc, RwLock},
8 time::Duration,
9};
10
11use anyhow::Result;
12use bcs;
13use fastcrypto::{ed25519::Ed25519PublicKey, traits::ToFromBytes};
14use iota_sdk::{IotaClient, IotaClientBuilder, rpc_types::IotaObjectDataOptions};
15use iota_tls::Allower;
16use iota_types::{
17 base_types::ObjectID,
18 dynamic_field::Field,
19 iota_system_state::{
20 iota_system_state_inner_v1::ValidatorV1,
21 iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
22 },
23};
24use itertools::Itertools;
25use tracing::{debug, error, info};
26
27pub type IotaPeers = Arc<RwLock<HashMap<Ed25519PublicKey, IotaPeer>>>;
29
30#[derive(Hash, PartialEq, Eq, Debug, Clone)]
31pub struct IotaPeer {
32 pub name: String,
33 pub public_key: Ed25519PublicKey,
34}
35
36#[derive(Debug, Clone)]
43pub struct IotaNodeProvider {
44 active_validator_nodes: IotaPeers,
45 pending_validator_nodes: IotaPeers,
46 static_nodes: IotaPeers,
47 rpc_url: String,
48 rpc_poll_interval: Duration,
49}
50
51impl Allower for IotaNodeProvider {
52 fn allowed(&self, key: &Ed25519PublicKey) -> bool {
53 self.static_nodes.read().unwrap().contains_key(key)
54 || self
55 .active_validator_nodes
56 .read()
57 .unwrap()
58 .contains_key(key)
59 || self
60 .pending_validator_nodes
61 .read()
62 .unwrap()
63 .contains_key(key)
64 }
65}
66
67impl IotaNodeProvider {
68 pub fn new(rpc_url: String, rpc_poll_interval: Duration, static_peers: Vec<IotaPeer>) -> Self {
69 let static_nodes: HashMap<Ed25519PublicKey, IotaPeer> = static_peers
72 .into_iter()
73 .map(|v| (v.public_key.clone(), v))
74 .collect();
75 let static_nodes = Arc::new(RwLock::new(static_nodes));
76 let active_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
77 let pending_validator_nodes = Arc::new(RwLock::new(HashMap::new()));
78 Self {
79 active_validator_nodes,
80 pending_validator_nodes,
81 static_nodes,
82 rpc_url,
83 rpc_poll_interval,
84 }
85 }
86
87 pub fn get(&self, key: &Ed25519PublicKey) -> Option<IotaPeer> {
89 debug!("look for {:?}", key);
90 if let Some(v) = self.static_nodes.read().unwrap().get(key) {
92 return Some(IotaPeer {
93 name: v.name.to_owned(),
94 public_key: v.public_key.to_owned(),
95 });
96 }
97 if let Some(v) = self.active_validator_nodes.read().unwrap().get(key) {
99 return Some(IotaPeer {
100 name: v.name.to_owned(),
101 public_key: v.public_key.to_owned(),
102 });
103 }
104 if let Some(v) = self.pending_validator_nodes.read().unwrap().get(key) {
106 return Some(IotaPeer {
107 name: v.name.to_owned(),
108 public_key: v.public_key.to_owned(),
109 });
110 }
111 None
112 }
113
114 pub fn get_mut(&mut self) -> &mut IotaPeers {
116 &mut self.active_validator_nodes
117 }
118
119 fn update_active_validator_set(&self, summary: &IotaSystemStateSummary) {
121 let active_validator_summaries = summary
122 .iter_active_validators()
123 .cloned()
124 .collect::<Vec<IotaValidatorSummary>>();
125
126 let active_validators = extract_validators_from_summaries(&active_validator_summaries);
129 let mut allow = self.active_validator_nodes.write().unwrap();
130 allow.clear();
131 allow.extend(active_validators);
132 info!(
133 "{} iota validators managed to make it on the allow list",
134 allow.len()
135 );
136 }
137
138 fn update_pending_validator_set(&self, pending_validators: Vec<ValidatorV1>) {
139 let summaries = pending_validators
140 .into_iter()
141 .map(|v| v.into_iota_validator_summary())
142 .collect_vec();
143 let validators = extract_validators_from_summaries(&summaries);
144 let mut allow = self.pending_validator_nodes.write().unwrap();
145 allow.clear();
146 allow.extend(validators);
147 info!(
148 "{} iota pending validators managed to make it on the allow list",
149 allow.len()
150 );
151 }
152
153 async fn get_pending_validators(
154 iota_client: &IotaClient,
155 pending_active_validators_id: ObjectID,
156 ) -> Result<Vec<ValidatorV1>> {
157 let pending_validators_ids = iota_client
158 .read_api()
159 .get_dynamic_fields(pending_active_validators_id, None, None)
160 .await?
161 .data
162 .into_iter()
163 .map(|dyi| dyi.object_id)
164 .collect::<Vec<_>>();
165
166 let responses = iota_client
167 .read_api()
168 .multi_get_object_with_options(
169 pending_validators_ids,
170 IotaObjectDataOptions::default().with_bcs(),
171 )
172 .await?;
173
174 responses
175 .into_iter()
176 .map(|resp| {
177 let object_id = resp.object_id()?;
178 let bcs = resp.move_object_bcs().ok_or_else(|| {
179 anyhow::anyhow!(
180 "Object {object_id} does not exist or does not return bcs bytes",
181 )
182 })?;
183 let field = bcs::from_bytes::<Field<u64, ValidatorV1>>(bcs).map_err(|e| {
184 anyhow::anyhow!(
185 "Can't convert bcs bytes of object {object_id} to Field<u64, ValidatorV1>: {e}",
186 )
187 })?;
188
189 Ok(field.value)
190 })
191 .collect()
192 }
193
194 pub fn poll_peer_list(&self) {
196 info!("Started polling for peers using rpc: {}", self.rpc_url);
197
198 let rpc_poll_interval = self.rpc_poll_interval;
199 let cloned_self = self.clone();
200 tokio::spawn(async move {
201 let mut interval = tokio::time::interval(rpc_poll_interval);
202 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
203
204 loop {
205 interval.tick().await;
206
207 match IotaClientBuilder::default()
208 .build(&cloned_self.rpc_url)
209 .await
210 {
211 Ok(client) => {
212 match client.governance_api().get_latest_iota_system_state().await {
213 Ok(system_state) => {
214 cloned_self.update_active_validator_set(&system_state);
215 info!("Successfully updated active validators");
216
217 let pending_active_validators_id = match &system_state {
218 IotaSystemStateSummary::V1(system_state) => {
219 system_state.pending_active_validators_id
220 }
221 IotaSystemStateSummary::V2(system_state) => {
222 system_state.pending_active_validators_id
223 }
224 _ => panic!("unsupported IotaSystemStateSummary"),
225 };
226
227 match Self::get_pending_validators(
228 &client,
229 pending_active_validators_id,
230 )
231 .await
232 {
233 Ok(pending_validators) => {
234 cloned_self
235 .update_pending_validator_set(pending_validators);
236 info!("Successfully updated pending validators");
237 }
238 Err(e) => {
239 error!("Failed to get pending validators: {:?}", e);
240 }
241 }
242 }
243 Err(e) => {
244 error!("Failed to get latest iota system state: {:?}", e);
245 }
246 }
247 }
248 Err(e) => {
249 error!("Failed to create IotaClient: {:?}", e);
250 }
251 }
252 }
253 });
254 }
255}
256
257fn extract_validators_from_summaries(
262 validator_summaries: &[IotaValidatorSummary],
263) -> impl Iterator<Item = (Ed25519PublicKey, IotaPeer)> + use<'_> {
264 validator_summaries.iter().filter_map(|vm| {
265 match Ed25519PublicKey::from_bytes(&vm.network_pubkey_bytes) {
266 Ok(public_key) => {
267 debug!(
268 "adding public key {:?} for iota validator {:?}",
269 public_key, vm.name
270 );
271 Some((
272 public_key.clone(),
273 IotaPeer {
274 name: vm.name.to_owned(),
275 public_key,
276 },
277 )) }
279 Err(error) => {
280 error!(
281 "unable to decode public key for name: {:?} iota_address: {:?} error: {error}",
282 vm.name, vm.iota_address
283 );
284 None }
286 }
287 })
288}
289
290#[cfg(test)]
291mod tests {
292 use iota_types::iota_system_state::iota_system_state_summary::IotaValidatorSummary;
293 use multiaddr::Multiaddr;
294
295 use super::*;
296 use crate::admin::{CertKeyPair, generate_self_cert};
297 #[test]
298 fn extract_validators_from_summary() {
299 let CertKeyPair(_, client_pub_key) = generate_self_cert("iota".into());
300 let p2p_address: Multiaddr = "/ip4/127.0.0.1/tcp/10000"
301 .parse()
302 .expect("expected a multiaddr value");
303 let summaries = vec![IotaValidatorSummary {
304 network_pubkey_bytes: Vec::from(client_pub_key.as_bytes()),
305 p2p_address: format!("{p2p_address}"),
306 primary_address: "empty".into(),
307 ..Default::default()
308 }];
309 let peers = extract_validators_from_summaries(&summaries);
310 assert_eq!(peers.count(), 1, "peers should have been a length of 1");
311 }
312}