iota_source_validation_service/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    ffi::OsString,
8    fmt, fs,
9    net::TcpListener,
10    path::{Path, PathBuf},
11    process::Command,
12    sync::{Arc, RwLock},
13    time::Duration,
14};
15
16use anyhow::{anyhow, bail};
17use axum::{
18    Extension, Json, Router,
19    extract::{Query, State},
20    middleware::{self, Next},
21    response::{IntoResponse, Response},
22    routing::get,
23};
24use hyper::{
25    HeaderMap, StatusCode,
26    http::{HeaderName, HeaderValue, Method},
27};
28use iota_metrics::RegistryService;
29use iota_move::manage_package::resolve_lock_file_path;
30use iota_move_build::{BuildConfig, IotaPackageHooks, implicit_deps};
31use iota_package_management::system_package_versions::latest_system_packages;
32use iota_sdk::{
33    IotaClientBuilder, rpc_types::IotaTransactionBlockEffects, types::base_types::ObjectID,
34};
35use iota_source_validation::{BytecodeSourceVerifier, ValidationMode};
36use move_core_types::account_address::AccountAddress;
37use move_package::{BuildConfig as MoveBuildConfig, LintFlag};
38use move_symbol_pool::Symbol;
39use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
40use serde::{Deserialize, Serialize};
41use tokio::sync::oneshot::Sender;
42use tower::ServiceBuilder;
43use tracing::{debug, info};
44use url::Url;
45
46pub const HOST_PORT_ENV: &str = "HOST_PORT";
47pub const IOTA_SOURCE_VALIDATION_VERSION_HEADER: &str = "x-iota-source-validation-version";
48pub const IOTA_SOURCE_VALIDATION_VERSION: &str = "0.1";
49
50pub const MAINNET_URL: &str = "https://api.mainnet.iota.cafe:443";
51pub const TESTNET_URL: &str = "https://api.testnet.iota.cafe:443";
52pub const DEVNET_URL: &str = "https://api.devnet.iota.cafe:443";
53pub const LOCALNET_URL: &str = "http://127.0.0.1:9000";
54
55pub const MAINNET_WS_URL: &str = "wss://api.mainnet.iota.cafe:443";
56pub const TESTNET_WS_URL: &str = "wss://api.testnet.iota.cafe:443";
57pub const DEVNET_WS_URL: &str = "wss://api.devnet.iota.cafe:443";
58pub const LOCALNET_WS_URL: &str = "ws://127.0.0.1:9000";
59
60pub const WS_PING_INTERVAL: Duration = Duration::from_millis(20_000);
61
62pub const METRICS_ROUTE: &str = "/metrics";
63pub const METRICS_HOST_PORT: &str = "0.0.0.0:9184";
64
65pub fn host_port() -> String {
66    match option_env!("HOST_PORT") {
67        Some(v) => v.to_string(),
68        None => String::from("0.0.0.0:8000"),
69    }
70}
71
72#[derive(Clone, Deserialize, Debug)]
73pub struct Config {
74    pub packages: Vec<PackageSource>,
75}
76
77#[derive(Clone, Deserialize, Debug)]
78#[serde(tag = "source", content = "values")]
79pub enum PackageSource {
80    Repository(RepositorySource),
81    Directory(DirectorySource),
82}
83
84#[derive(Clone, Deserialize, Debug)]
85pub struct RepositorySource {
86    pub repository: String,
87    pub network: Option<Network>,
88    pub branches: Vec<Branch>,
89}
90
91#[derive(Clone, Deserialize, Debug)]
92pub struct Branch {
93    pub branch: String,
94    pub paths: Vec<Package>,
95}
96
97#[derive(Clone, Deserialize, Debug)]
98pub struct DirectorySource {
99    pub paths: Vec<Package>,
100    pub network: Option<Network>,
101}
102
103#[derive(Clone, Deserialize, Debug)]
104pub struct Package {
105    pub path: String,
106    /// Optional object ID to watch for upgrades. For framework packages, this
107    /// is an address like 0x2. For non-framework packages this is an
108    /// upgrade cap (possibly wrapped).
109    pub watch: Option<ObjectID>,
110}
111
112#[derive(Clone, Serialize, Debug)]
113pub struct SourceInfo {
114    pub path: PathBuf,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    // Is Some when content is hydrated from disk.
117    pub source: Option<String>,
118}
119
120#[derive(Eq, PartialEq, Clone, Default, Serialize, Deserialize, Debug, Ord, PartialOrd)]
121#[serde(rename_all = "lowercase")]
122pub enum Network {
123    #[default]
124    #[serde(alias = "Mainnet")]
125    Mainnet,
126    #[serde(alias = "Testnet")]
127    Testnet,
128    #[serde(alias = "Devnet")]
129    Devnet,
130    #[serde(alias = "Localnet")]
131    Localnet,
132}
133
134impl fmt::Display for Network {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(
137            f,
138            "{}",
139            match self {
140                Network::Mainnet => "mainnet",
141                Network::Testnet => "testnet",
142                Network::Devnet => "devnet",
143                Network::Localnet => "localnet",
144            }
145        )
146    }
147}
148
149/// Map module name to verified source info.
150pub type SourceLookup = BTreeMap<Symbol, SourceInfo>;
151/// Map addresses to module names and sources.
152pub type AddressLookup = BTreeMap<AccountAddress, SourceLookup>;
153/// Top-level lookup that maps network to sources for corresponding on-chain
154/// networks.
155pub type NetworkLookup = BTreeMap<Network, AddressLookup>;
156
157pub async fn verify_package(
158    network: &Network,
159    package_path: impl AsRef<Path>,
160) -> anyhow::Result<(Network, AddressLookup)> {
161    move_package::package_hooks::register_package_hooks(Box::new(IotaPackageHooks));
162    // TODO(rvantonder): use config RPC URL instead of hardcoded URLs
163    let network_url = match network {
164        Network::Mainnet => MAINNET_URL,
165        Network::Testnet => TESTNET_URL,
166        Network::Devnet => DEVNET_URL,
167        Network::Localnet => LOCALNET_URL,
168    };
169    let client = IotaClientBuilder::default().build(network_url).await?;
170    let chain_id = client.read_api().get_chain_identifier().await?;
171    let mut config =
172        resolve_lock_file_path(MoveBuildConfig::default(), Some(package_path.as_ref()))?;
173    config.lint_flag = LintFlag::LEVEL_NONE;
174    config.silence_warnings = true;
175    config.implicit_dependencies = implicit_deps(latest_system_packages());
176    let build_config = BuildConfig {
177        config,
178        run_bytecode_verifier: false, // no need to run verifier if code is on-chain
179        print_diags_to_stderr: false,
180        chain_id: Some(chain_id),
181    };
182    let compiled_package = build_config.build(package_path.as_ref())?;
183
184    BytecodeSourceVerifier::new(client.read_api())
185        .verify(&compiled_package, ValidationMode::root())
186        .await
187        .map_err(|e| anyhow!("Network {network}: {e}"))?;
188
189    let mut address_map = AddressLookup::new();
190    let address = compiled_package
191        .published_at
192        .as_ref()
193        .map(|id| **id)
194        .map_err(|_| anyhow!("could not resolve published-at field in package manifest"))?;
195    info!("verifying {} at {address}", package_path.as_ref().display());
196    for v in &compiled_package.package.root_compiled_units {
197        let path = v.source_path.to_path_buf();
198        let source = Some(fs::read_to_string(path.as_path())?);
199        let name = v.unit.name;
200        if let Some(existing) = address_map.get_mut(&address) {
201            existing.insert(name, SourceInfo { path, source });
202        } else {
203            let mut source_map = SourceLookup::new();
204            source_map.insert(name, SourceInfo { path, source });
205            address_map.insert(address, source_map);
206        }
207    }
208    Ok((network.clone(), address_map))
209}
210
211pub fn parse_config(config_path: impl AsRef<Path>) -> anyhow::Result<Config> {
212    let contents = fs::read_to_string(config_path)?;
213    Ok(toml::from_str(&contents)?)
214}
215
216pub fn repo_name_from_url(url: &str) -> anyhow::Result<String> {
217    let repo_url = Url::parse(url)?;
218    let mut components = repo_url
219        .path_segments()
220        .ok_or_else(|| anyhow!("Could not discover repository path in url {url}"))?;
221    let repo_name = components
222        .next_back()
223        .ok_or_else(|| anyhow!("Could not discover repository name in url {url}"))?;
224    Ok(repo_name.to_string())
225}
226
227#[derive(Debug)]
228/// Represents a sequence of git commands to clone a repository and sparsely
229/// checkout Move packages within.
230pub struct CloneCommand {
231    /// git args
232    args: Vec<Vec<OsString>>,
233    /// report repository url in error messages
234    repo_url: String,
235}
236
237impl CloneCommand {
238    pub fn new(p: &RepositorySource, b: &Branch, dest: &Path) -> anyhow::Result<CloneCommand> {
239        let repo_name = repo_name_from_url(&p.repository)?;
240        let network = p.network.clone().unwrap_or_default().to_string();
241        let sanitized_branch = b.branch.replace('/', "__");
242        let dest = dest
243            .join(network)
244            .join(format!("{repo_name}__{sanitized_branch}"))
245            .into_os_string();
246
247        macro_rules! ostr {
248            ($arg:expr) => {
249                OsString::from($arg)
250            };
251        }
252
253        let mut args = vec![];
254        // Args to clone empty repository
255        let cmd_args: Vec<OsString> = vec![
256            ostr!("clone"),
257            ostr!("--no-checkout"),
258            ostr!("--depth=1"), // implies --single-branch
259            ostr!("--filter=tree:0"),
260            ostr!(format!("--branch={}", b.branch)),
261            ostr!(&p.repository),
262            ostr!(dest.clone()),
263        ];
264        args.push(cmd_args);
265
266        // Args to sparse checkout the package set
267        let mut cmd_args: Vec<OsString> = vec![
268            ostr!("-C"),
269            dest.clone(),
270            ostr!("sparse-checkout"),
271            ostr!("set"),
272            ostr!("--no-cone"),
273        ];
274        let path_args: Vec<OsString> = b
275            .paths
276            .iter()
277            .map(|p| OsString::from(p.path.clone()))
278            .collect();
279        cmd_args.extend_from_slice(&path_args);
280        args.push(cmd_args);
281
282        // Args to checkout the default branch.
283        let cmd_args: Vec<OsString> = vec![ostr!("-C"), dest, ostr!("checkout")];
284        args.push(cmd_args);
285
286        Ok(Self {
287            args,
288            repo_url: p.repository.clone(),
289        })
290    }
291
292    pub async fn run(&self) -> anyhow::Result<()> {
293        for args in &self.args {
294            let result = Command::new("git").args(args).output().map_err(|_| {
295                anyhow!(
296                    "Error cloning {} with command `git {:#?}`",
297                    self.repo_url,
298                    args
299                )
300            })?;
301            if !result.status.success() {
302                bail!(
303                    "Nonzero exit status when cloning {} with command `git {:#?}`. \
304		     Stderr: {}",
305                    self.repo_url,
306                    args,
307                    String::from_utf8_lossy(&result.stderr)
308                )
309            }
310        }
311        Ok(())
312    }
313}
314
315/// Clones repositories and checks out packages as per `config` at the directory
316/// `dir`.
317pub async fn clone_repositories(repos: Vec<&RepositorySource>, dir: &Path) -> anyhow::Result<()> {
318    let mut tasks = vec![];
319    for p in &repos {
320        for b in &p.branches {
321            let command = CloneCommand::new(p, b, dir)?;
322            info!(
323                "cloning {}:{} to {}",
324                &p.repository,
325                &b.branch,
326                dir.display()
327            );
328            let t = tokio::spawn(async move { command.run().await });
329            tasks.push(t);
330        }
331    }
332
333    for t in tasks {
334        t.await.unwrap()?;
335    }
336    Ok(())
337}
338
339pub async fn initialize(
340    config: &Config,
341    dir: &Path,
342) -> anyhow::Result<(NetworkLookup, NetworkLookup)> {
343    let mut repos = vec![];
344    for s in &config.packages {
345        match s {
346            PackageSource::Repository(r) => repos.push(r),
347            PackageSource::Directory(_) => (), // skip cloning
348        }
349    }
350    clone_repositories(repos, dir).await?;
351    let sources = verify_packages(config, dir).await?;
352    let sources_list = sources_list(&sources).await;
353    Ok((sources, sources_list))
354}
355
356pub async fn sources_list(sources: &NetworkLookup) -> NetworkLookup {
357    let mut sources_list = NetworkLookup::new();
358    for (network, addresses) in sources {
359        let mut address_map = AddressLookup::new();
360        for (address, symbols) in addresses {
361            let mut symbol_map = SourceLookup::new();
362            for (symbol, source_info) in symbols {
363                symbol_map.insert(
364                    *symbol,
365                    SourceInfo {
366                        path: source_info.path.file_name().unwrap().into(),
367                        source: None,
368                    },
369                );
370            }
371            address_map.insert(*address, symbol_map);
372        }
373        sources_list.insert(network.clone(), address_map);
374    }
375    sources_list
376}
377
378pub async fn verify_packages(config: &Config, dir: &Path) -> anyhow::Result<NetworkLookup> {
379    let mut tasks = vec![];
380    for p in &config.packages {
381        match p {
382            PackageSource::Repository(r) => {
383                let repo_name = repo_name_from_url(&r.repository)?;
384                let network_name = r.network.clone().unwrap_or_default().to_string();
385                for b in &r.branches {
386                    for p in &b.paths {
387                        let sanitized_branch = b.branch.replace('/', "__");
388                        let package_path = dir
389                            .join(network_name.clone())
390                            .join(format!("{repo_name}__{sanitized_branch}"))
391                            .join(p.path.clone())
392                            .clone();
393                        let network = r.network.clone().unwrap_or_default();
394                        let t =
395                            tokio::spawn(
396                                async move { verify_package(&network, package_path).await },
397                            );
398                        tasks.push(t)
399                    }
400                }
401            }
402            PackageSource::Directory(packages_dir) => {
403                for p in &packages_dir.paths {
404                    let package_path = PathBuf::from(p.path.clone());
405                    let network = packages_dir.network.clone().unwrap_or_default();
406                    let t =
407                        tokio::spawn(async move { verify_package(&network, package_path).await });
408                    tasks.push(t)
409                }
410            }
411        }
412    }
413
414    let mut mainnet_lookup = AddressLookup::new();
415    let mut testnet_lookup = AddressLookup::new();
416    let mut devnet_lookup = AddressLookup::new();
417    let mut localnet_lookup = AddressLookup::new();
418    for t in tasks {
419        let (network, new_lookup) = t.await.unwrap()?;
420        match network {
421            Network::Mainnet => mainnet_lookup.extend(new_lookup),
422            Network::Testnet => testnet_lookup.extend(new_lookup),
423            Network::Devnet => devnet_lookup.extend(new_lookup),
424            Network::Localnet => localnet_lookup.extend(new_lookup),
425        }
426    }
427    let mut lookup = NetworkLookup::new();
428    lookup.insert(Network::Mainnet, mainnet_lookup);
429    lookup.insert(Network::Testnet, testnet_lookup);
430    lookup.insert(Network::Devnet, devnet_lookup);
431    lookup.insert(Network::Localnet, localnet_lookup);
432    Ok(lookup)
433}
434
435// A thread that monitors on-chain transactions for package upgrades. `config`
436// specifies which packages to watch. `app_state` contains the map of sources
437// returned by the server. In particular, `watch_for_upgrades` invalidates
438// (i.e., clears) the sources returned by the serve when we observe a package
439// upgrade, so that we do not falsely report outdated sources for a package.
440// Pass an optional `channel` to observe the upgrade transaction(s).
441// The `channel` parameter exists for testing.
442pub async fn watch_for_upgrades(
443    _packages: Vec<PackageSource>,
444    _app_state: Arc<RwLock<AppState>>,
445    _network: Network,
446    _channel: Option<Sender<IotaTransactionBlockEffects>>,
447) -> anyhow::Result<()> {
448    Err(anyhow!(
449        "Fatal: JsonRPC Subscriptions no longer supported. Reimplement without using subscriptions."
450    ))
451}
452
453pub struct AppState {
454    pub sources: NetworkLookup,
455    pub metrics: Option<SourceServiceMetrics>,
456    pub sources_list: NetworkLookup,
457}
458
459pub async fn serve(app_state: Arc<RwLock<AppState>>) -> anyhow::Result<()> {
460    let app = Router::new()
461        .route("/api", get(api_route))
462        .route("/api/list", get(list_route))
463        .layer(
464            ServiceBuilder::new()
465                .layer(
466                    tower_http::cors::CorsLayer::new()
467                        .allow_methods([Method::GET])
468                        .allow_origin(tower_http::cors::Any),
469                )
470                .layer(middleware::from_fn(check_version_header)),
471        )
472        .with_state(app_state);
473    let listener = TcpListener::bind(host_port())?;
474    listener.set_nonblocking(true).unwrap();
475    let listener = tokio::net::TcpListener::from_std(listener)?;
476    axum::serve(listener, app).await?;
477    Ok(())
478}
479
480#[derive(Deserialize)]
481pub struct Request {
482    #[serde(default)]
483    network: Network,
484    address: String,
485    module: String,
486}
487
488#[derive(Serialize, Deserialize)]
489pub struct SourceResponse {
490    pub source: String,
491}
492
493#[derive(Serialize, Deserialize)]
494pub struct ErrorResponse {
495    pub error: String,
496}
497
498async fn api_route(
499    State(app_state): State<Arc<RwLock<AppState>>>,
500    Query(Request {
501        network,
502        address,
503        module,
504    }): Query<Request>,
505) -> impl IntoResponse {
506    debug!("request network={network}&address={address}&module={module}");
507    let symbol = Symbol::from(module);
508    let Ok(address) = AccountAddress::from_hex_literal(&address) else {
509        let error = format!("Invalid hex address {address}");
510        return (
511            StatusCode::BAD_REQUEST,
512            Json(ErrorResponse { error }).into_response(),
513        );
514    };
515
516    let app_state = app_state.read().unwrap();
517    if let Some(metrics) = &app_state.metrics {
518        metrics.total_requests_received.inc();
519    }
520    let source_result = app_state
521        .sources
522        .get(&network)
523        .and_then(|n| n.get(&address))
524        .and_then(|a| a.get(&symbol));
525    if let Some(SourceInfo {
526        source: Some(source),
527        ..
528    }) = source_result
529    {
530        (
531            StatusCode::OK,
532            Json(SourceResponse {
533                source: source.to_owned(),
534            })
535            .into_response(),
536        )
537    } else {
538        (
539            StatusCode::NOT_FOUND,
540            Json(ErrorResponse {
541                error: format!(
542                    "No source found for {symbol} at address {address} on network {network}"
543                ),
544            })
545            .into_response(),
546        )
547    }
548}
549
550async fn check_version_header(
551    headers: HeaderMap,
552    req: hyper::Request<axum::body::Body>,
553    next: Next,
554) -> Response {
555    let version = headers
556        .get(IOTA_SOURCE_VALIDATION_VERSION_HEADER)
557        .as_ref()
558        .and_then(|h| h.to_str().ok())
559        .map(|s| s.to_string());
560
561    match version {
562        Some(v) if v != IOTA_SOURCE_VALIDATION_VERSION => {
563            let error = format!(
564                "Unsupported version '{v}' specified in header \
565		 {IOTA_SOURCE_VALIDATION_VERSION_HEADER}"
566            );
567            let mut headers = HeaderMap::new();
568            headers.insert(
569                HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
570                HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
571            );
572            return (
573                StatusCode::BAD_REQUEST,
574                headers,
575                Json(ErrorResponse { error }).into_response(),
576            )
577                .into_response();
578        }
579        _ => (),
580    }
581    let mut response = next.run(req).await;
582    response.headers_mut().insert(
583        HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
584        HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
585    );
586    response
587}
588
589async fn list_route(State(app_state): State<Arc<RwLock<AppState>>>) -> impl IntoResponse {
590    let app_state = app_state.read().unwrap();
591    (
592        StatusCode::OK,
593        Json(app_state.sources_list.clone()).into_response(),
594    )
595}
596
597pub struct SourceServiceMetrics {
598    pub total_requests_received: IntCounter,
599}
600
601impl SourceServiceMetrics {
602    pub fn new(registry: &Registry) -> Self {
603        Self {
604            total_requests_received: register_int_counter_with_registry!(
605                "total_requests",
606                "Total number of requests received by Source Service",
607                registry
608            )
609            .unwrap(),
610        }
611    }
612}
613
614pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
615    let registry = Registry::new();
616
617    let registry_service = RegistryService::new(registry);
618
619    let app = Router::new()
620        .route(METRICS_ROUTE, get(iota_metrics::metrics))
621        .layer(Extension(registry_service.clone()));
622
623    tokio::spawn(async move {
624        listener.set_nonblocking(true).unwrap();
625        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
626        axum::serve(listener, app).await.unwrap();
627    });
628
629    registry_service
630}