1use std::{
6 collections::BTreeMap,
7 ffi::OsString,
8 fmt, fs,
9 net::TcpListener,
10 path::{Path, PathBuf},
11 process::Command,
12 sync::{Arc, RwLock},
13 time::Duration,
14};
15
16use anyhow::{anyhow, bail};
17use axum::{
18 Extension, Json, Router,
19 extract::{Query, State},
20 middleware::{self, Next},
21 response::{IntoResponse, Response},
22 routing::get,
23};
24use hyper::{
25 HeaderMap, StatusCode,
26 http::{HeaderName, HeaderValue, Method},
27};
28use iota_metrics::RegistryService;
29use iota_move::manage_package::resolve_lock_file_path;
30use iota_move_build::{BuildConfig, IotaPackageHooks};
31use iota_sdk::{
32 IotaClientBuilder, rpc_types::IotaTransactionBlockEffects, types::base_types::ObjectID,
33};
34use iota_source_validation::{BytecodeSourceVerifier, ValidationMode};
35use move_core_types::account_address::AccountAddress;
36use move_package::{BuildConfig as MoveBuildConfig, LintFlag};
37use move_symbol_pool::Symbol;
38use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
39use serde::{Deserialize, Serialize};
40use tokio::sync::oneshot::Sender;
41use tower::ServiceBuilder;
42use tracing::{debug, info};
43use url::Url;
44
45pub const HOST_PORT_ENV: &str = "HOST_PORT";
46pub const IOTA_SOURCE_VALIDATION_VERSION_HEADER: &str = "x-iota-source-validation-version";
47pub const IOTA_SOURCE_VALIDATION_VERSION: &str = "0.1";
48
49pub const MAINNET_URL: &str = "https://api.mainnet.iota.cafe:443";
50pub const TESTNET_URL: &str = "https://api.testnet.iota.cafe:443";
51pub const DEVNET_URL: &str = "https://api.devnet.iota.cafe:443";
52pub const LOCALNET_URL: &str = "http://127.0.0.1:9000";
53
54pub const MAINNET_WS_URL: &str = "wss://api.mainnet.iota.cafe:443";
55pub const TESTNET_WS_URL: &str = "wss://api.testnet.iota.cafe:443";
56pub const DEVNET_WS_URL: &str = "wss://api.devnet.iota.cafe:443";
57pub const LOCALNET_WS_URL: &str = "ws://127.0.0.1:9000";
58
59pub const WS_PING_INTERVAL: Duration = Duration::from_millis(20_000);
60
61pub const METRICS_ROUTE: &str = "/metrics";
62pub const METRICS_HOST_PORT: &str = "0.0.0.0:9184";
63
64pub fn host_port() -> String {
65 match option_env!("HOST_PORT") {
66 Some(v) => v.to_string(),
67 None => String::from("0.0.0.0:8000"),
68 }
69}
70
71#[derive(Clone, Deserialize, Debug)]
72pub struct Config {
73 pub packages: Vec<PackageSource>,
74}
75
76#[derive(Clone, Deserialize, Debug)]
77#[serde(tag = "source", content = "values")]
78pub enum PackageSource {
79 Repository(RepositorySource),
80 Directory(DirectorySource),
81}
82
83#[derive(Clone, Deserialize, Debug)]
84pub struct RepositorySource {
85 pub repository: String,
86 pub network: Option<Network>,
87 pub branches: Vec<Branch>,
88}
89
90#[derive(Clone, Deserialize, Debug)]
91pub struct Branch {
92 pub branch: String,
93 pub paths: Vec<Package>,
94}
95
96#[derive(Clone, Deserialize, Debug)]
97pub struct DirectorySource {
98 pub paths: Vec<Package>,
99 pub network: Option<Network>,
100}
101
102#[derive(Clone, Deserialize, Debug)]
103pub struct Package {
104 pub path: String,
105 pub watch: Option<ObjectID>,
109}
110
111#[derive(Clone, Serialize, Debug)]
112pub struct SourceInfo {
113 pub path: PathBuf,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub source: Option<String>,
117}
118
119#[derive(Eq, PartialEq, Clone, Default, Serialize, Deserialize, Debug, Ord, PartialOrd)]
120#[serde(rename_all = "lowercase")]
121pub enum Network {
122 #[default]
123 #[serde(alias = "Mainnet")]
124 Mainnet,
125 #[serde(alias = "Testnet")]
126 Testnet,
127 #[serde(alias = "Devnet")]
128 Devnet,
129 #[serde(alias = "Localnet")]
130 Localnet,
131}
132
133impl fmt::Display for Network {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(
136 f,
137 "{}",
138 match self {
139 Network::Mainnet => "mainnet",
140 Network::Testnet => "testnet",
141 Network::Devnet => "devnet",
142 Network::Localnet => "localnet",
143 }
144 )
145 }
146}
147
148pub type SourceLookup = BTreeMap<Symbol, SourceInfo>;
150pub type AddressLookup = BTreeMap<AccountAddress, SourceLookup>;
152pub type NetworkLookup = BTreeMap<Network, AddressLookup>;
155
156pub async fn verify_package(
157 network: &Network,
158 package_path: impl AsRef<Path>,
159) -> anyhow::Result<(Network, AddressLookup)> {
160 move_package::package_hooks::register_package_hooks(Box::new(IotaPackageHooks));
161 let network_url = match network {
163 Network::Mainnet => MAINNET_URL,
164 Network::Testnet => TESTNET_URL,
165 Network::Devnet => DEVNET_URL,
166 Network::Localnet => LOCALNET_URL,
167 };
168 let client = IotaClientBuilder::default().build(network_url).await?;
169 let chain_id = client.read_api().get_chain_identifier().await?;
170 let mut config =
171 resolve_lock_file_path(MoveBuildConfig::default(), Some(package_path.as_ref()))?;
172 config.lint_flag = LintFlag::LEVEL_NONE;
173 config.silence_warnings = true;
174 let build_config = BuildConfig {
175 config,
176 run_bytecode_verifier: false, print_diags_to_stderr: false,
178 chain_id: Some(chain_id),
179 };
180 let compiled_package = build_config.build(package_path.as_ref())?;
181
182 BytecodeSourceVerifier::new(client.read_api())
183 .verify(&compiled_package, ValidationMode::root())
184 .await
185 .map_err(|e| anyhow!("Network {network}: {e}"))?;
186
187 let mut address_map = AddressLookup::new();
188 let address = compiled_package
189 .published_at
190 .as_ref()
191 .map(|id| **id)
192 .map_err(|_| anyhow!("could not resolve published-at field in package manifest"))?;
193 info!("verifying {} at {address}", package_path.as_ref().display());
194 for v in &compiled_package.package.root_compiled_units {
195 let path = v.source_path.to_path_buf();
196 let source = Some(fs::read_to_string(path.as_path())?);
197 let name = v.unit.name;
198 if let Some(existing) = address_map.get_mut(&address) {
199 existing.insert(name, SourceInfo { path, source });
200 } else {
201 let mut source_map = SourceLookup::new();
202 source_map.insert(name, SourceInfo { path, source });
203 address_map.insert(address, source_map);
204 }
205 }
206 Ok((network.clone(), address_map))
207}
208
209pub fn parse_config(config_path: impl AsRef<Path>) -> anyhow::Result<Config> {
210 let contents = fs::read_to_string(config_path)?;
211 Ok(toml::from_str(&contents)?)
212}
213
214pub fn repo_name_from_url(url: &str) -> anyhow::Result<String> {
215 let repo_url = Url::parse(url)?;
216 let mut components = repo_url
217 .path_segments()
218 .ok_or_else(|| anyhow!("Could not discover repository path in url {url}"))?;
219 let repo_name = components
220 .next_back()
221 .ok_or_else(|| anyhow!("Could not discover repository name in url {url}"))?;
222 Ok(repo_name.to_string())
223}
224
225#[derive(Debug)]
226pub struct CloneCommand {
229 args: Vec<Vec<OsString>>,
231 repo_url: String,
233}
234
235impl CloneCommand {
236 pub fn new(p: &RepositorySource, b: &Branch, dest: &Path) -> anyhow::Result<CloneCommand> {
237 let repo_name = repo_name_from_url(&p.repository)?;
238 let network = p.network.clone().unwrap_or_default().to_string();
239 let sanitized_branch = b.branch.replace('/', "__");
240 let dest = dest
241 .join(network)
242 .join(format!("{repo_name}__{sanitized_branch}"))
243 .into_os_string();
244
245 macro_rules! ostr {
246 ($arg:expr) => {
247 OsString::from($arg)
248 };
249 }
250
251 let mut args = vec![];
252 let cmd_args: Vec<OsString> = vec![
254 ostr!("clone"),
255 ostr!("--no-checkout"),
256 ostr!("--depth=1"), ostr!("--filter=tree:0"),
258 ostr!(format!("--branch={}", b.branch)),
259 ostr!(&p.repository),
260 ostr!(dest.clone()),
261 ];
262 args.push(cmd_args);
263
264 let mut cmd_args: Vec<OsString> = vec![
266 ostr!("-C"),
267 dest.clone(),
268 ostr!("sparse-checkout"),
269 ostr!("set"),
270 ostr!("--no-cone"),
271 ];
272 let path_args: Vec<OsString> = b
273 .paths
274 .iter()
275 .map(|p| OsString::from(p.path.clone()))
276 .collect();
277 cmd_args.extend_from_slice(&path_args);
278 args.push(cmd_args);
279
280 let cmd_args: Vec<OsString> = vec![ostr!("-C"), dest, ostr!("checkout")];
282 args.push(cmd_args);
283
284 Ok(Self {
285 args,
286 repo_url: p.repository.clone(),
287 })
288 }
289
290 pub async fn run(&self) -> anyhow::Result<()> {
291 for args in &self.args {
292 let result = Command::new("git").args(args).output().map_err(|_| {
293 anyhow!(
294 "Error cloning {} with command `git {:#?}`",
295 self.repo_url,
296 args
297 )
298 })?;
299 if !result.status.success() {
300 bail!(
301 "Nonzero exit status when cloning {} with command `git {:#?}`. \
302 Stderr: {}",
303 self.repo_url,
304 args,
305 String::from_utf8_lossy(&result.stderr)
306 )
307 }
308 }
309 Ok(())
310 }
311}
312
313pub async fn clone_repositories(repos: Vec<&RepositorySource>, dir: &Path) -> anyhow::Result<()> {
316 let mut tasks = vec![];
317 for p in &repos {
318 for b in &p.branches {
319 let command = CloneCommand::new(p, b, dir)?;
320 info!(
321 "cloning {}:{} to {}",
322 &p.repository,
323 &b.branch,
324 dir.display()
325 );
326 let t = tokio::spawn(async move { command.run().await });
327 tasks.push(t);
328 }
329 }
330
331 for t in tasks {
332 t.await.unwrap()?;
333 }
334 Ok(())
335}
336
337pub async fn initialize(
338 config: &Config,
339 dir: &Path,
340) -> anyhow::Result<(NetworkLookup, NetworkLookup)> {
341 let mut repos = vec![];
342 for s in &config.packages {
343 match s {
344 PackageSource::Repository(r) => repos.push(r),
345 PackageSource::Directory(_) => (), }
347 }
348 clone_repositories(repos, dir).await?;
349 let sources = verify_packages(config, dir).await?;
350 let sources_list = sources_list(&sources).await;
351 Ok((sources, sources_list))
352}
353
354pub async fn sources_list(sources: &NetworkLookup) -> NetworkLookup {
355 let mut sources_list = NetworkLookup::new();
356 for (network, addresses) in sources {
357 let mut address_map = AddressLookup::new();
358 for (address, symbols) in addresses {
359 let mut symbol_map = SourceLookup::new();
360 for (symbol, source_info) in symbols {
361 symbol_map.insert(
362 *symbol,
363 SourceInfo {
364 path: source_info.path.file_name().unwrap().into(),
365 source: None,
366 },
367 );
368 }
369 address_map.insert(*address, symbol_map);
370 }
371 sources_list.insert(network.clone(), address_map);
372 }
373 sources_list
374}
375
376pub async fn verify_packages(config: &Config, dir: &Path) -> anyhow::Result<NetworkLookup> {
377 let mut tasks = vec![];
378 for p in &config.packages {
379 match p {
380 PackageSource::Repository(r) => {
381 let repo_name = repo_name_from_url(&r.repository)?;
382 let network_name = r.network.clone().unwrap_or_default().to_string();
383 for b in &r.branches {
384 for p in &b.paths {
385 let sanitized_branch = b.branch.replace('/', "__");
386 let package_path = dir
387 .join(network_name.clone())
388 .join(format!("{repo_name}__{sanitized_branch}"))
389 .join(p.path.clone())
390 .clone();
391 let network = r.network.clone().unwrap_or_default();
392 let t =
393 tokio::spawn(
394 async move { verify_package(&network, package_path).await },
395 );
396 tasks.push(t)
397 }
398 }
399 }
400 PackageSource::Directory(packages_dir) => {
401 for p in &packages_dir.paths {
402 let package_path = PathBuf::from(p.path.clone());
403 let network = packages_dir.network.clone().unwrap_or_default();
404 let t =
405 tokio::spawn(async move { verify_package(&network, package_path).await });
406 tasks.push(t)
407 }
408 }
409 }
410 }
411
412 let mut mainnet_lookup = AddressLookup::new();
413 let mut testnet_lookup = AddressLookup::new();
414 let mut devnet_lookup = AddressLookup::new();
415 let mut localnet_lookup = AddressLookup::new();
416 for t in tasks {
417 let (network, new_lookup) = t.await.unwrap()?;
418 match network {
419 Network::Mainnet => mainnet_lookup.extend(new_lookup),
420 Network::Testnet => testnet_lookup.extend(new_lookup),
421 Network::Devnet => devnet_lookup.extend(new_lookup),
422 Network::Localnet => localnet_lookup.extend(new_lookup),
423 }
424 }
425 let mut lookup = NetworkLookup::new();
426 lookup.insert(Network::Mainnet, mainnet_lookup);
427 lookup.insert(Network::Testnet, testnet_lookup);
428 lookup.insert(Network::Devnet, devnet_lookup);
429 lookup.insert(Network::Localnet, localnet_lookup);
430 Ok(lookup)
431}
432
433pub async fn watch_for_upgrades(
441 _packages: Vec<PackageSource>,
442 _app_state: Arc<RwLock<AppState>>,
443 _network: Network,
444 _channel: Option<Sender<IotaTransactionBlockEffects>>,
445) -> anyhow::Result<()> {
446 Err(anyhow!(
447 "Fatal: JsonRPC Subscriptions no longer supported. Reimplement without using subscriptions."
448 ))
449}
450
451pub struct AppState {
452 pub sources: NetworkLookup,
453 pub metrics: Option<SourceServiceMetrics>,
454 pub sources_list: NetworkLookup,
455}
456
457pub async fn serve(app_state: Arc<RwLock<AppState>>) -> anyhow::Result<()> {
458 let app = Router::new()
459 .route("/api", get(api_route))
460 .route("/api/list", get(list_route))
461 .layer(
462 ServiceBuilder::new()
463 .layer(
464 tower_http::cors::CorsLayer::new()
465 .allow_methods([Method::GET])
466 .allow_origin(tower_http::cors::Any),
467 )
468 .layer(middleware::from_fn(check_version_header)),
469 )
470 .with_state(app_state);
471 let listener = TcpListener::bind(host_port())?;
472 listener.set_nonblocking(true).unwrap();
473 let listener = tokio::net::TcpListener::from_std(listener)?;
474 axum::serve(listener, app).await?;
475 Ok(())
476}
477
478#[derive(Deserialize)]
479pub struct Request {
480 #[serde(default)]
481 network: Network,
482 address: String,
483 module: String,
484}
485
486#[derive(Serialize, Deserialize)]
487pub struct SourceResponse {
488 pub source: String,
489}
490
491#[derive(Serialize, Deserialize)]
492pub struct ErrorResponse {
493 pub error: String,
494}
495
496async fn api_route(
497 State(app_state): State<Arc<RwLock<AppState>>>,
498 Query(Request {
499 network,
500 address,
501 module,
502 }): Query<Request>,
503) -> impl IntoResponse {
504 debug!("request network={network}&address={address}&module={module}");
505 let symbol = Symbol::from(module);
506 let Ok(address) = AccountAddress::from_hex_literal(&address) else {
507 let error = format!("Invalid hex address {address}");
508 return (
509 StatusCode::BAD_REQUEST,
510 Json(ErrorResponse { error }).into_response(),
511 );
512 };
513
514 let app_state = app_state.read().unwrap();
515 if let Some(metrics) = &app_state.metrics {
516 metrics.total_requests_received.inc();
517 }
518 let source_result = app_state
519 .sources
520 .get(&network)
521 .and_then(|n| n.get(&address))
522 .and_then(|a| a.get(&symbol));
523 if let Some(SourceInfo {
524 source: Some(source),
525 ..
526 }) = source_result
527 {
528 (
529 StatusCode::OK,
530 Json(SourceResponse {
531 source: source.to_owned(),
532 })
533 .into_response(),
534 )
535 } else {
536 (
537 StatusCode::NOT_FOUND,
538 Json(ErrorResponse {
539 error: format!(
540 "No source found for {symbol} at address {address} on network {network}"
541 ),
542 })
543 .into_response(),
544 )
545 }
546}
547
548async fn check_version_header(
549 headers: HeaderMap,
550 req: hyper::Request<axum::body::Body>,
551 next: Next,
552) -> Response {
553 let version = headers
554 .get(IOTA_SOURCE_VALIDATION_VERSION_HEADER)
555 .as_ref()
556 .and_then(|h| h.to_str().ok())
557 .map(|s| s.to_string());
558
559 match version {
560 Some(v) if v != IOTA_SOURCE_VALIDATION_VERSION => {
561 let error = format!(
562 "Unsupported version '{v}' specified in header \
563 {IOTA_SOURCE_VALIDATION_VERSION_HEADER}"
564 );
565 let mut headers = HeaderMap::new();
566 headers.insert(
567 HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
568 HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
569 );
570 return (
571 StatusCode::BAD_REQUEST,
572 headers,
573 Json(ErrorResponse { error }).into_response(),
574 )
575 .into_response();
576 }
577 _ => (),
578 }
579 let mut response = next.run(req).await;
580 response.headers_mut().insert(
581 HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
582 HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
583 );
584 response
585}
586
587async fn list_route(State(app_state): State<Arc<RwLock<AppState>>>) -> impl IntoResponse {
588 let app_state = app_state.read().unwrap();
589 (
590 StatusCode::OK,
591 Json(app_state.sources_list.clone()).into_response(),
592 )
593}
594
595pub struct SourceServiceMetrics {
596 pub total_requests_received: IntCounter,
597}
598
599impl SourceServiceMetrics {
600 pub fn new(registry: &Registry) -> Self {
601 Self {
602 total_requests_received: register_int_counter_with_registry!(
603 "total_requests",
604 "Total number of requests received by Source Service",
605 registry
606 )
607 .unwrap(),
608 }
609 }
610}
611
612pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
613 let registry = Registry::new();
614
615 let registry_service = RegistryService::new(registry);
616
617 let app = Router::new()
618 .route(METRICS_ROUTE, get(iota_metrics::metrics))
619 .layer(Extension(registry_service.clone()));
620
621 tokio::spawn(async move {
622 listener.set_nonblocking(true).unwrap();
623 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
624 axum::serve(listener, app).await.unwrap();
625 });
626
627 registry_service
628}