1use std::{
6 collections::BTreeMap,
7 ffi::OsString,
8 fmt, fs,
9 net::TcpListener,
10 path::{Path, PathBuf},
11 process::Command,
12 sync::{Arc, RwLock},
13 time::Duration,
14};
15
16use anyhow::{anyhow, bail};
17use axum::{
18 Extension, Json, Router,
19 extract::{Query, State},
20 middleware::{self, Next},
21 response::{IntoResponse, Response},
22 routing::get,
23};
24use hyper::{
25 HeaderMap, StatusCode,
26 http::{HeaderName, HeaderValue, Method},
27};
28use iota_metrics::RegistryService;
29use iota_move::manage_package::resolve_lock_file_path;
30use iota_move_build::{BuildConfig, IotaPackageHooks, implicit_deps};
31use iota_package_management::system_package_versions::latest_system_packages;
32use iota_sdk::{
33 IotaClientBuilder, rpc_types::IotaTransactionBlockEffects, types::base_types::ObjectID,
34};
35use iota_source_validation::{BytecodeSourceVerifier, ValidationMode};
36use move_core_types::account_address::AccountAddress;
37use move_package::{BuildConfig as MoveBuildConfig, LintFlag};
38use move_symbol_pool::Symbol;
39use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
40use serde::{Deserialize, Serialize};
41use tokio::sync::oneshot::Sender;
42use tower::ServiceBuilder;
43use tracing::{debug, info};
44use url::Url;
45
46pub const HOST_PORT_ENV: &str = "HOST_PORT";
47pub const IOTA_SOURCE_VALIDATION_VERSION_HEADER: &str = "x-iota-source-validation-version";
48pub const IOTA_SOURCE_VALIDATION_VERSION: &str = "0.1";
49
50pub const MAINNET_URL: &str = "https://api.mainnet.iota.cafe:443";
51pub const TESTNET_URL: &str = "https://api.testnet.iota.cafe:443";
52pub const DEVNET_URL: &str = "https://api.devnet.iota.cafe:443";
53pub const LOCALNET_URL: &str = "http://127.0.0.1:9000";
54
55pub const MAINNET_WS_URL: &str = "wss://api.mainnet.iota.cafe:443";
56pub const TESTNET_WS_URL: &str = "wss://api.testnet.iota.cafe:443";
57pub const DEVNET_WS_URL: &str = "wss://api.devnet.iota.cafe:443";
58pub const LOCALNET_WS_URL: &str = "ws://127.0.0.1:9000";
59
60pub const WS_PING_INTERVAL: Duration = Duration::from_millis(20_000);
61
62pub const METRICS_ROUTE: &str = "/metrics";
63pub const METRICS_HOST_PORT: &str = "0.0.0.0:9184";
64
65pub fn host_port() -> String {
66 match option_env!("HOST_PORT") {
67 Some(v) => v.to_string(),
68 None => String::from("0.0.0.0:8000"),
69 }
70}
71
72#[derive(Clone, Deserialize, Debug)]
73pub struct Config {
74 pub packages: Vec<PackageSource>,
75}
76
77#[derive(Clone, Deserialize, Debug)]
78#[serde(tag = "source", content = "values")]
79pub enum PackageSource {
80 Repository(RepositorySource),
81 Directory(DirectorySource),
82}
83
84#[derive(Clone, Deserialize, Debug)]
85pub struct RepositorySource {
86 pub repository: String,
87 pub network: Option<Network>,
88 pub branches: Vec<Branch>,
89}
90
91#[derive(Clone, Deserialize, Debug)]
92pub struct Branch {
93 pub branch: String,
94 pub paths: Vec<Package>,
95}
96
97#[derive(Clone, Deserialize, Debug)]
98pub struct DirectorySource {
99 pub paths: Vec<Package>,
100 pub network: Option<Network>,
101}
102
103#[derive(Clone, Deserialize, Debug)]
104pub struct Package {
105 pub path: String,
106 pub watch: Option<ObjectID>,
110}
111
112#[derive(Clone, Serialize, Debug)]
113pub struct SourceInfo {
114 pub path: PathBuf,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub source: Option<String>,
118}
119
120#[derive(Eq, PartialEq, Clone, Default, Serialize, Deserialize, Debug, Ord, PartialOrd)]
121#[serde(rename_all = "lowercase")]
122pub enum Network {
123 #[default]
124 #[serde(alias = "Mainnet")]
125 Mainnet,
126 #[serde(alias = "Testnet")]
127 Testnet,
128 #[serde(alias = "Devnet")]
129 Devnet,
130 #[serde(alias = "Localnet")]
131 Localnet,
132}
133
134impl fmt::Display for Network {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 write!(
137 f,
138 "{}",
139 match self {
140 Network::Mainnet => "mainnet",
141 Network::Testnet => "testnet",
142 Network::Devnet => "devnet",
143 Network::Localnet => "localnet",
144 }
145 )
146 }
147}
148
149pub type SourceLookup = BTreeMap<Symbol, SourceInfo>;
151pub type AddressLookup = BTreeMap<AccountAddress, SourceLookup>;
153pub type NetworkLookup = BTreeMap<Network, AddressLookup>;
156
157pub async fn verify_package(
158 network: &Network,
159 package_path: impl AsRef<Path>,
160) -> anyhow::Result<(Network, AddressLookup)> {
161 move_package::package_hooks::register_package_hooks(Box::new(IotaPackageHooks));
162 let network_url = match network {
164 Network::Mainnet => MAINNET_URL,
165 Network::Testnet => TESTNET_URL,
166 Network::Devnet => DEVNET_URL,
167 Network::Localnet => LOCALNET_URL,
168 };
169 let client = IotaClientBuilder::default().build(network_url).await?;
170 let chain_id = client.read_api().get_chain_identifier().await?;
171 let mut config =
172 resolve_lock_file_path(MoveBuildConfig::default(), Some(package_path.as_ref()))?;
173 config.lint_flag = LintFlag::LEVEL_NONE;
174 config.silence_warnings = true;
175 config.implicit_dependencies = implicit_deps(latest_system_packages());
176 let build_config = BuildConfig {
177 config,
178 run_bytecode_verifier: false, print_diags_to_stderr: false,
180 chain_id: Some(chain_id),
181 };
182 let compiled_package = build_config.build(package_path.as_ref())?;
183
184 BytecodeSourceVerifier::new(client.read_api())
185 .verify(&compiled_package, ValidationMode::root())
186 .await
187 .map_err(|e| anyhow!("Network {network}: {e}"))?;
188
189 let mut address_map = AddressLookup::new();
190 let address = compiled_package
191 .published_at
192 .as_ref()
193 .map(|id| **id)
194 .map_err(|_| anyhow!("could not resolve published-at field in package manifest"))?;
195 info!("verifying {} at {address}", package_path.as_ref().display());
196 for v in &compiled_package.package.root_compiled_units {
197 let path = v.source_path.to_path_buf();
198 let source = Some(fs::read_to_string(path.as_path())?);
199 let name = v.unit.name;
200 if let Some(existing) = address_map.get_mut(&address) {
201 existing.insert(name, SourceInfo { path, source });
202 } else {
203 let mut source_map = SourceLookup::new();
204 source_map.insert(name, SourceInfo { path, source });
205 address_map.insert(address, source_map);
206 }
207 }
208 Ok((network.clone(), address_map))
209}
210
211pub fn parse_config(config_path: impl AsRef<Path>) -> anyhow::Result<Config> {
212 let contents = fs::read_to_string(config_path)?;
213 Ok(toml::from_str(&contents)?)
214}
215
216pub fn repo_name_from_url(url: &str) -> anyhow::Result<String> {
217 let repo_url = Url::parse(url)?;
218 let mut components = repo_url
219 .path_segments()
220 .ok_or_else(|| anyhow!("Could not discover repository path in url {url}"))?;
221 let repo_name = components
222 .next_back()
223 .ok_or_else(|| anyhow!("Could not discover repository name in url {url}"))?;
224 Ok(repo_name.to_string())
225}
226
227#[derive(Debug)]
228pub struct CloneCommand {
231 args: Vec<Vec<OsString>>,
233 repo_url: String,
235}
236
237impl CloneCommand {
238 pub fn new(p: &RepositorySource, b: &Branch, dest: &Path) -> anyhow::Result<CloneCommand> {
239 let repo_name = repo_name_from_url(&p.repository)?;
240 let network = p.network.clone().unwrap_or_default().to_string();
241 let sanitized_branch = b.branch.replace('/', "__");
242 let dest = dest
243 .join(network)
244 .join(format!("{repo_name}__{sanitized_branch}"))
245 .into_os_string();
246
247 macro_rules! ostr {
248 ($arg:expr) => {
249 OsString::from($arg)
250 };
251 }
252
253 let mut args = vec![];
254 let cmd_args: Vec<OsString> = vec![
256 ostr!("clone"),
257 ostr!("--no-checkout"),
258 ostr!("--depth=1"), ostr!("--filter=tree:0"),
260 ostr!(format!("--branch={}", b.branch)),
261 ostr!(&p.repository),
262 ostr!(dest.clone()),
263 ];
264 args.push(cmd_args);
265
266 let mut cmd_args: Vec<OsString> = vec![
268 ostr!("-C"),
269 dest.clone(),
270 ostr!("sparse-checkout"),
271 ostr!("set"),
272 ostr!("--no-cone"),
273 ];
274 let path_args: Vec<OsString> = b
275 .paths
276 .iter()
277 .map(|p| OsString::from(p.path.clone()))
278 .collect();
279 cmd_args.extend_from_slice(&path_args);
280 args.push(cmd_args);
281
282 let cmd_args: Vec<OsString> = vec![ostr!("-C"), dest, ostr!("checkout")];
284 args.push(cmd_args);
285
286 Ok(Self {
287 args,
288 repo_url: p.repository.clone(),
289 })
290 }
291
292 pub async fn run(&self) -> anyhow::Result<()> {
293 for args in &self.args {
294 let result = Command::new("git").args(args).output().map_err(|_| {
295 anyhow!(
296 "Error cloning {} with command `git {:#?}`",
297 self.repo_url,
298 args
299 )
300 })?;
301 if !result.status.success() {
302 bail!(
303 "Nonzero exit status when cloning {} with command `git {:#?}`. \
304 Stderr: {}",
305 self.repo_url,
306 args,
307 String::from_utf8_lossy(&result.stderr)
308 )
309 }
310 }
311 Ok(())
312 }
313}
314
315pub async fn clone_repositories(repos: Vec<&RepositorySource>, dir: &Path) -> anyhow::Result<()> {
318 let mut tasks = vec![];
319 for p in &repos {
320 for b in &p.branches {
321 let command = CloneCommand::new(p, b, dir)?;
322 info!(
323 "cloning {}:{} to {}",
324 &p.repository,
325 &b.branch,
326 dir.display()
327 );
328 let t = tokio::spawn(async move { command.run().await });
329 tasks.push(t);
330 }
331 }
332
333 for t in tasks {
334 t.await.unwrap()?;
335 }
336 Ok(())
337}
338
339pub async fn initialize(
340 config: &Config,
341 dir: &Path,
342) -> anyhow::Result<(NetworkLookup, NetworkLookup)> {
343 let mut repos = vec![];
344 for s in &config.packages {
345 match s {
346 PackageSource::Repository(r) => repos.push(r),
347 PackageSource::Directory(_) => (), }
349 }
350 clone_repositories(repos, dir).await?;
351 let sources = verify_packages(config, dir).await?;
352 let sources_list = sources_list(&sources).await;
353 Ok((sources, sources_list))
354}
355
356pub async fn sources_list(sources: &NetworkLookup) -> NetworkLookup {
357 let mut sources_list = NetworkLookup::new();
358 for (network, addresses) in sources {
359 let mut address_map = AddressLookup::new();
360 for (address, symbols) in addresses {
361 let mut symbol_map = SourceLookup::new();
362 for (symbol, source_info) in symbols {
363 symbol_map.insert(
364 *symbol,
365 SourceInfo {
366 path: source_info.path.file_name().unwrap().into(),
367 source: None,
368 },
369 );
370 }
371 address_map.insert(*address, symbol_map);
372 }
373 sources_list.insert(network.clone(), address_map);
374 }
375 sources_list
376}
377
378pub async fn verify_packages(config: &Config, dir: &Path) -> anyhow::Result<NetworkLookup> {
379 let mut tasks = vec![];
380 for p in &config.packages {
381 match p {
382 PackageSource::Repository(r) => {
383 let repo_name = repo_name_from_url(&r.repository)?;
384 let network_name = r.network.clone().unwrap_or_default().to_string();
385 for b in &r.branches {
386 for p in &b.paths {
387 let sanitized_branch = b.branch.replace('/', "__");
388 let package_path = dir
389 .join(network_name.clone())
390 .join(format!("{repo_name}__{sanitized_branch}"))
391 .join(p.path.clone())
392 .clone();
393 let network = r.network.clone().unwrap_or_default();
394 let t =
395 tokio::spawn(
396 async move { verify_package(&network, package_path).await },
397 );
398 tasks.push(t)
399 }
400 }
401 }
402 PackageSource::Directory(packages_dir) => {
403 for p in &packages_dir.paths {
404 let package_path = PathBuf::from(p.path.clone());
405 let network = packages_dir.network.clone().unwrap_or_default();
406 let t =
407 tokio::spawn(async move { verify_package(&network, package_path).await });
408 tasks.push(t)
409 }
410 }
411 }
412 }
413
414 let mut mainnet_lookup = AddressLookup::new();
415 let mut testnet_lookup = AddressLookup::new();
416 let mut devnet_lookup = AddressLookup::new();
417 let mut localnet_lookup = AddressLookup::new();
418 for t in tasks {
419 let (network, new_lookup) = t.await.unwrap()?;
420 match network {
421 Network::Mainnet => mainnet_lookup.extend(new_lookup),
422 Network::Testnet => testnet_lookup.extend(new_lookup),
423 Network::Devnet => devnet_lookup.extend(new_lookup),
424 Network::Localnet => localnet_lookup.extend(new_lookup),
425 }
426 }
427 let mut lookup = NetworkLookup::new();
428 lookup.insert(Network::Mainnet, mainnet_lookup);
429 lookup.insert(Network::Testnet, testnet_lookup);
430 lookup.insert(Network::Devnet, devnet_lookup);
431 lookup.insert(Network::Localnet, localnet_lookup);
432 Ok(lookup)
433}
434
435pub async fn watch_for_upgrades(
443 _packages: Vec<PackageSource>,
444 _app_state: Arc<RwLock<AppState>>,
445 _network: Network,
446 _channel: Option<Sender<IotaTransactionBlockEffects>>,
447) -> anyhow::Result<()> {
448 Err(anyhow!(
449 "Fatal: JsonRPC Subscriptions no longer supported. Reimplement without using subscriptions."
450 ))
451}
452
453pub struct AppState {
454 pub sources: NetworkLookup,
455 pub metrics: Option<SourceServiceMetrics>,
456 pub sources_list: NetworkLookup,
457}
458
459pub async fn serve(app_state: Arc<RwLock<AppState>>) -> anyhow::Result<()> {
460 let app = Router::new()
461 .route("/api", get(api_route))
462 .route("/api/list", get(list_route))
463 .layer(
464 ServiceBuilder::new()
465 .layer(
466 tower_http::cors::CorsLayer::new()
467 .allow_methods([Method::GET])
468 .allow_origin(tower_http::cors::Any),
469 )
470 .layer(middleware::from_fn(check_version_header)),
471 )
472 .with_state(app_state);
473 let listener = TcpListener::bind(host_port())?;
474 listener.set_nonblocking(true).unwrap();
475 let listener = tokio::net::TcpListener::from_std(listener)?;
476 axum::serve(listener, app).await?;
477 Ok(())
478}
479
480#[derive(Deserialize)]
481pub struct Request {
482 #[serde(default)]
483 network: Network,
484 address: String,
485 module: String,
486}
487
488#[derive(Serialize, Deserialize)]
489pub struct SourceResponse {
490 pub source: String,
491}
492
493#[derive(Serialize, Deserialize)]
494pub struct ErrorResponse {
495 pub error: String,
496}
497
498async fn api_route(
499 State(app_state): State<Arc<RwLock<AppState>>>,
500 Query(Request {
501 network,
502 address,
503 module,
504 }): Query<Request>,
505) -> impl IntoResponse {
506 debug!("request network={network}&address={address}&module={module}");
507 let symbol = Symbol::from(module);
508 let Ok(address) = AccountAddress::from_hex_literal(&address) else {
509 let error = format!("Invalid hex address {address}");
510 return (
511 StatusCode::BAD_REQUEST,
512 Json(ErrorResponse { error }).into_response(),
513 );
514 };
515
516 let app_state = app_state.read().unwrap();
517 if let Some(metrics) = &app_state.metrics {
518 metrics.total_requests_received.inc();
519 }
520 let source_result = app_state
521 .sources
522 .get(&network)
523 .and_then(|n| n.get(&address))
524 .and_then(|a| a.get(&symbol));
525 if let Some(SourceInfo {
526 source: Some(source),
527 ..
528 }) = source_result
529 {
530 (
531 StatusCode::OK,
532 Json(SourceResponse {
533 source: source.to_owned(),
534 })
535 .into_response(),
536 )
537 } else {
538 (
539 StatusCode::NOT_FOUND,
540 Json(ErrorResponse {
541 error: format!(
542 "No source found for {symbol} at address {address} on network {network}"
543 ),
544 })
545 .into_response(),
546 )
547 }
548}
549
550async fn check_version_header(
551 headers: HeaderMap,
552 req: hyper::Request<axum::body::Body>,
553 next: Next,
554) -> Response {
555 let version = headers
556 .get(IOTA_SOURCE_VALIDATION_VERSION_HEADER)
557 .as_ref()
558 .and_then(|h| h.to_str().ok())
559 .map(|s| s.to_string());
560
561 match version {
562 Some(v) if v != IOTA_SOURCE_VALIDATION_VERSION => {
563 let error = format!(
564 "Unsupported version '{v}' specified in header \
565 {IOTA_SOURCE_VALIDATION_VERSION_HEADER}"
566 );
567 let mut headers = HeaderMap::new();
568 headers.insert(
569 HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
570 HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
571 );
572 return (
573 StatusCode::BAD_REQUEST,
574 headers,
575 Json(ErrorResponse { error }).into_response(),
576 )
577 .into_response();
578 }
579 _ => (),
580 }
581 let mut response = next.run(req).await;
582 response.headers_mut().insert(
583 HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
584 HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
585 );
586 response
587}
588
589async fn list_route(State(app_state): State<Arc<RwLock<AppState>>>) -> impl IntoResponse {
590 let app_state = app_state.read().unwrap();
591 (
592 StatusCode::OK,
593 Json(app_state.sources_list.clone()).into_response(),
594 )
595}
596
597pub struct SourceServiceMetrics {
598 pub total_requests_received: IntCounter,
599}
600
601impl SourceServiceMetrics {
602 pub fn new(registry: &Registry) -> Self {
603 Self {
604 total_requests_received: register_int_counter_with_registry!(
605 "total_requests",
606 "Total number of requests received by Source Service",
607 registry
608 )
609 .unwrap(),
610 }
611 }
612}
613
614pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
615 let registry = Registry::new();
616
617 let registry_service = RegistryService::new(registry);
618
619 let app = Router::new()
620 .route(METRICS_ROUTE, get(iota_metrics::metrics))
621 .layer(Extension(registry_service.clone()));
622
623 tokio::spawn(async move {
624 listener.set_nonblocking(true).unwrap();
625 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
626 axum::serve(listener, app).await.unwrap();
627 });
628
629 registry_service
630}