iota_source_validation_service/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    ffi::OsString,
8    fmt, fs,
9    net::TcpListener,
10    path::{Path, PathBuf},
11    process::Command,
12    sync::{Arc, RwLock},
13    time::Duration,
14};
15
16use anyhow::{anyhow, bail};
17use axum::{
18    Extension, Json, Router,
19    extract::{Query, State},
20    middleware::{self, Next},
21    response::{IntoResponse, Response},
22    routing::get,
23};
24use hyper::{
25    HeaderMap, StatusCode,
26    http::{HeaderName, HeaderValue, Method},
27};
28use iota_metrics::RegistryService;
29use iota_move::manage_package::resolve_lock_file_path;
30use iota_move_build::{BuildConfig, IotaPackageHooks};
31use iota_sdk::{
32    IotaClientBuilder, rpc_types::IotaTransactionBlockEffects, types::base_types::ObjectID,
33};
34use iota_source_validation::{BytecodeSourceVerifier, ValidationMode};
35use move_core_types::account_address::AccountAddress;
36use move_package::{BuildConfig as MoveBuildConfig, LintFlag};
37use move_symbol_pool::Symbol;
38use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
39use serde::{Deserialize, Serialize};
40use tokio::sync::oneshot::Sender;
41use tower::ServiceBuilder;
42use tracing::{debug, info};
43use url::Url;
44
45pub const HOST_PORT_ENV: &str = "HOST_PORT";
46pub const IOTA_SOURCE_VALIDATION_VERSION_HEADER: &str = "x-iota-source-validation-version";
47pub const IOTA_SOURCE_VALIDATION_VERSION: &str = "0.1";
48
49pub const MAINNET_URL: &str = "https://api.mainnet.iota.cafe:443";
50pub const TESTNET_URL: &str = "https://api.testnet.iota.cafe:443";
51pub const DEVNET_URL: &str = "https://api.devnet.iota.cafe:443";
52pub const LOCALNET_URL: &str = "http://127.0.0.1:9000";
53
54pub const MAINNET_WS_URL: &str = "wss://api.mainnet.iota.cafe:443";
55pub const TESTNET_WS_URL: &str = "wss://api.testnet.iota.cafe:443";
56pub const DEVNET_WS_URL: &str = "wss://api.devnet.iota.cafe:443";
57pub const LOCALNET_WS_URL: &str = "ws://127.0.0.1:9000";
58
59pub const WS_PING_INTERVAL: Duration = Duration::from_millis(20_000);
60
61pub const METRICS_ROUTE: &str = "/metrics";
62pub const METRICS_HOST_PORT: &str = "0.0.0.0:9184";
63
64pub fn host_port() -> String {
65    match option_env!("HOST_PORT") {
66        Some(v) => v.to_string(),
67        None => String::from("0.0.0.0:8000"),
68    }
69}
70
71#[derive(Clone, Deserialize, Debug)]
72pub struct Config {
73    pub packages: Vec<PackageSource>,
74}
75
76#[derive(Clone, Deserialize, Debug)]
77#[serde(tag = "source", content = "values")]
78pub enum PackageSource {
79    Repository(RepositorySource),
80    Directory(DirectorySource),
81}
82
83#[derive(Clone, Deserialize, Debug)]
84pub struct RepositorySource {
85    pub repository: String,
86    pub network: Option<Network>,
87    pub branches: Vec<Branch>,
88}
89
90#[derive(Clone, Deserialize, Debug)]
91pub struct Branch {
92    pub branch: String,
93    pub paths: Vec<Package>,
94}
95
96#[derive(Clone, Deserialize, Debug)]
97pub struct DirectorySource {
98    pub paths: Vec<Package>,
99    pub network: Option<Network>,
100}
101
102#[derive(Clone, Deserialize, Debug)]
103pub struct Package {
104    pub path: String,
105    /// Optional object ID to watch for upgrades. For framework packages, this
106    /// is an address like 0x2. For non-framework packages this is an
107    /// upgrade cap (possibly wrapped).
108    pub watch: Option<ObjectID>,
109}
110
111#[derive(Clone, Serialize, Debug)]
112pub struct SourceInfo {
113    pub path: PathBuf,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    // Is Some when content is hydrated from disk.
116    pub source: Option<String>,
117}
118
119#[derive(Eq, PartialEq, Clone, Default, Serialize, Deserialize, Debug, Ord, PartialOrd)]
120#[serde(rename_all = "lowercase")]
121pub enum Network {
122    #[default]
123    #[serde(alias = "Mainnet")]
124    Mainnet,
125    #[serde(alias = "Testnet")]
126    Testnet,
127    #[serde(alias = "Devnet")]
128    Devnet,
129    #[serde(alias = "Localnet")]
130    Localnet,
131}
132
133impl fmt::Display for Network {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(
136            f,
137            "{}",
138            match self {
139                Network::Mainnet => "mainnet",
140                Network::Testnet => "testnet",
141                Network::Devnet => "devnet",
142                Network::Localnet => "localnet",
143            }
144        )
145    }
146}
147
148/// Map module name to verified source info.
149pub type SourceLookup = BTreeMap<Symbol, SourceInfo>;
150/// Map addresses to module names and sources.
151pub type AddressLookup = BTreeMap<AccountAddress, SourceLookup>;
152/// Top-level lookup that maps network to sources for corresponding on-chain
153/// networks.
154pub type NetworkLookup = BTreeMap<Network, AddressLookup>;
155
156pub async fn verify_package(
157    network: &Network,
158    package_path: impl AsRef<Path>,
159) -> anyhow::Result<(Network, AddressLookup)> {
160    move_package::package_hooks::register_package_hooks(Box::new(IotaPackageHooks));
161    // TODO(rvantonder): use config RPC URL instead of hardcoded URLs
162    let network_url = match network {
163        Network::Mainnet => MAINNET_URL,
164        Network::Testnet => TESTNET_URL,
165        Network::Devnet => DEVNET_URL,
166        Network::Localnet => LOCALNET_URL,
167    };
168    let client = IotaClientBuilder::default().build(network_url).await?;
169    let chain_id = client.read_api().get_chain_identifier().await?;
170    let mut config =
171        resolve_lock_file_path(MoveBuildConfig::default(), Some(package_path.as_ref()))?;
172    config.lint_flag = LintFlag::LEVEL_NONE;
173    config.silence_warnings = true;
174    let build_config = BuildConfig {
175        config,
176        run_bytecode_verifier: false, // no need to run verifier if code is on-chain
177        print_diags_to_stderr: false,
178        chain_id: Some(chain_id),
179    };
180    let compiled_package = build_config.build(package_path.as_ref())?;
181
182    BytecodeSourceVerifier::new(client.read_api())
183        .verify(&compiled_package, ValidationMode::root())
184        .await
185        .map_err(|e| anyhow!("Network {network}: {e}"))?;
186
187    let mut address_map = AddressLookup::new();
188    let address = compiled_package
189        .published_at
190        .as_ref()
191        .map(|id| **id)
192        .map_err(|_| anyhow!("could not resolve published-at field in package manifest"))?;
193    info!("verifying {} at {address}", package_path.as_ref().display());
194    for v in &compiled_package.package.root_compiled_units {
195        let path = v.source_path.to_path_buf();
196        let source = Some(fs::read_to_string(path.as_path())?);
197        let name = v.unit.name;
198        if let Some(existing) = address_map.get_mut(&address) {
199            existing.insert(name, SourceInfo { path, source });
200        } else {
201            let mut source_map = SourceLookup::new();
202            source_map.insert(name, SourceInfo { path, source });
203            address_map.insert(address, source_map);
204        }
205    }
206    Ok((network.clone(), address_map))
207}
208
209pub fn parse_config(config_path: impl AsRef<Path>) -> anyhow::Result<Config> {
210    let contents = fs::read_to_string(config_path)?;
211    Ok(toml::from_str(&contents)?)
212}
213
214pub fn repo_name_from_url(url: &str) -> anyhow::Result<String> {
215    let repo_url = Url::parse(url)?;
216    let mut components = repo_url
217        .path_segments()
218        .ok_or_else(|| anyhow!("Could not discover repository path in url {url}"))?;
219    let repo_name = components
220        .next_back()
221        .ok_or_else(|| anyhow!("Could not discover repository name in url {url}"))?;
222    Ok(repo_name.to_string())
223}
224
225#[derive(Debug)]
226/// Represents a sequence of git commands to clone a repository and sparsely
227/// checkout Move packages within.
228pub struct CloneCommand {
229    /// git args
230    args: Vec<Vec<OsString>>,
231    /// report repository url in error messages
232    repo_url: String,
233}
234
235impl CloneCommand {
236    pub fn new(p: &RepositorySource, b: &Branch, dest: &Path) -> anyhow::Result<CloneCommand> {
237        let repo_name = repo_name_from_url(&p.repository)?;
238        let network = p.network.clone().unwrap_or_default().to_string();
239        let sanitized_branch = b.branch.replace('/', "__");
240        let dest = dest
241            .join(network)
242            .join(format!("{repo_name}__{sanitized_branch}"))
243            .into_os_string();
244
245        macro_rules! ostr {
246            ($arg:expr) => {
247                OsString::from($arg)
248            };
249        }
250
251        let mut args = vec![];
252        // Args to clone empty repository
253        let cmd_args: Vec<OsString> = vec![
254            ostr!("clone"),
255            ostr!("--no-checkout"),
256            ostr!("--depth=1"), // implies --single-branch
257            ostr!("--filter=tree:0"),
258            ostr!(format!("--branch={}", b.branch)),
259            ostr!(&p.repository),
260            ostr!(dest.clone()),
261        ];
262        args.push(cmd_args);
263
264        // Args to sparse checkout the package set
265        let mut cmd_args: Vec<OsString> = vec![
266            ostr!("-C"),
267            dest.clone(),
268            ostr!("sparse-checkout"),
269            ostr!("set"),
270            ostr!("--no-cone"),
271        ];
272        let path_args: Vec<OsString> = b
273            .paths
274            .iter()
275            .map(|p| OsString::from(p.path.clone()))
276            .collect();
277        cmd_args.extend_from_slice(&path_args);
278        args.push(cmd_args);
279
280        // Args to checkout the default branch.
281        let cmd_args: Vec<OsString> = vec![ostr!("-C"), dest, ostr!("checkout")];
282        args.push(cmd_args);
283
284        Ok(Self {
285            args,
286            repo_url: p.repository.clone(),
287        })
288    }
289
290    pub async fn run(&self) -> anyhow::Result<()> {
291        for args in &self.args {
292            let result = Command::new("git").args(args).output().map_err(|_| {
293                anyhow!(
294                    "Error cloning {} with command `git {:#?}`",
295                    self.repo_url,
296                    args
297                )
298            })?;
299            if !result.status.success() {
300                bail!(
301                    "Nonzero exit status when cloning {} with command `git {:#?}`. \
302		     Stderr: {}",
303                    self.repo_url,
304                    args,
305                    String::from_utf8_lossy(&result.stderr)
306                )
307            }
308        }
309        Ok(())
310    }
311}
312
313/// Clones repositories and checks out packages as per `config` at the directory
314/// `dir`.
315pub async fn clone_repositories(repos: Vec<&RepositorySource>, dir: &Path) -> anyhow::Result<()> {
316    let mut tasks = vec![];
317    for p in &repos {
318        for b in &p.branches {
319            let command = CloneCommand::new(p, b, dir)?;
320            info!(
321                "cloning {}:{} to {}",
322                &p.repository,
323                &b.branch,
324                dir.display()
325            );
326            let t = tokio::spawn(async move { command.run().await });
327            tasks.push(t);
328        }
329    }
330
331    for t in tasks {
332        t.await.unwrap()?;
333    }
334    Ok(())
335}
336
337pub async fn initialize(
338    config: &Config,
339    dir: &Path,
340) -> anyhow::Result<(NetworkLookup, NetworkLookup)> {
341    let mut repos = vec![];
342    for s in &config.packages {
343        match s {
344            PackageSource::Repository(r) => repos.push(r),
345            PackageSource::Directory(_) => (), // skip cloning
346        }
347    }
348    clone_repositories(repos, dir).await?;
349    let sources = verify_packages(config, dir).await?;
350    let sources_list = sources_list(&sources).await;
351    Ok((sources, sources_list))
352}
353
354pub async fn sources_list(sources: &NetworkLookup) -> NetworkLookup {
355    let mut sources_list = NetworkLookup::new();
356    for (network, addresses) in sources {
357        let mut address_map = AddressLookup::new();
358        for (address, symbols) in addresses {
359            let mut symbol_map = SourceLookup::new();
360            for (symbol, source_info) in symbols {
361                symbol_map.insert(
362                    *symbol,
363                    SourceInfo {
364                        path: source_info.path.file_name().unwrap().into(),
365                        source: None,
366                    },
367                );
368            }
369            address_map.insert(*address, symbol_map);
370        }
371        sources_list.insert(network.clone(), address_map);
372    }
373    sources_list
374}
375
376pub async fn verify_packages(config: &Config, dir: &Path) -> anyhow::Result<NetworkLookup> {
377    let mut tasks = vec![];
378    for p in &config.packages {
379        match p {
380            PackageSource::Repository(r) => {
381                let repo_name = repo_name_from_url(&r.repository)?;
382                let network_name = r.network.clone().unwrap_or_default().to_string();
383                for b in &r.branches {
384                    for p in &b.paths {
385                        let sanitized_branch = b.branch.replace('/', "__");
386                        let package_path = dir
387                            .join(network_name.clone())
388                            .join(format!("{repo_name}__{sanitized_branch}"))
389                            .join(p.path.clone())
390                            .clone();
391                        let network = r.network.clone().unwrap_or_default();
392                        let t =
393                            tokio::spawn(
394                                async move { verify_package(&network, package_path).await },
395                            );
396                        tasks.push(t)
397                    }
398                }
399            }
400            PackageSource::Directory(packages_dir) => {
401                for p in &packages_dir.paths {
402                    let package_path = PathBuf::from(p.path.clone());
403                    let network = packages_dir.network.clone().unwrap_or_default();
404                    let t =
405                        tokio::spawn(async move { verify_package(&network, package_path).await });
406                    tasks.push(t)
407                }
408            }
409        }
410    }
411
412    let mut mainnet_lookup = AddressLookup::new();
413    let mut testnet_lookup = AddressLookup::new();
414    let mut devnet_lookup = AddressLookup::new();
415    let mut localnet_lookup = AddressLookup::new();
416    for t in tasks {
417        let (network, new_lookup) = t.await.unwrap()?;
418        match network {
419            Network::Mainnet => mainnet_lookup.extend(new_lookup),
420            Network::Testnet => testnet_lookup.extend(new_lookup),
421            Network::Devnet => devnet_lookup.extend(new_lookup),
422            Network::Localnet => localnet_lookup.extend(new_lookup),
423        }
424    }
425    let mut lookup = NetworkLookup::new();
426    lookup.insert(Network::Mainnet, mainnet_lookup);
427    lookup.insert(Network::Testnet, testnet_lookup);
428    lookup.insert(Network::Devnet, devnet_lookup);
429    lookup.insert(Network::Localnet, localnet_lookup);
430    Ok(lookup)
431}
432
433// A thread that monitors on-chain transactions for package upgrades. `config`
434// specifies which packages to watch. `app_state` contains the map of sources
435// returned by the server. In particular, `watch_for_upgrades` invalidates
436// (i.e., clears) the sources returned by the serve when we observe a package
437// upgrade, so that we do not falsely report outdated sources for a package.
438// Pass an optional `channel` to observe the upgrade transaction(s).
439// The `channel` parameter exists for testing.
440pub async fn watch_for_upgrades(
441    _packages: Vec<PackageSource>,
442    _app_state: Arc<RwLock<AppState>>,
443    _network: Network,
444    _channel: Option<Sender<IotaTransactionBlockEffects>>,
445) -> anyhow::Result<()> {
446    Err(anyhow!(
447        "Fatal: JsonRPC Subscriptions no longer supported. Reimplement without using subscriptions."
448    ))
449}
450
451pub struct AppState {
452    pub sources: NetworkLookup,
453    pub metrics: Option<SourceServiceMetrics>,
454    pub sources_list: NetworkLookup,
455}
456
457pub async fn serve(app_state: Arc<RwLock<AppState>>) -> anyhow::Result<()> {
458    let app = Router::new()
459        .route("/api", get(api_route))
460        .route("/api/list", get(list_route))
461        .layer(
462            ServiceBuilder::new()
463                .layer(
464                    tower_http::cors::CorsLayer::new()
465                        .allow_methods([Method::GET])
466                        .allow_origin(tower_http::cors::Any),
467                )
468                .layer(middleware::from_fn(check_version_header)),
469        )
470        .with_state(app_state);
471    let listener = TcpListener::bind(host_port())?;
472    listener.set_nonblocking(true).unwrap();
473    let listener = tokio::net::TcpListener::from_std(listener)?;
474    axum::serve(listener, app).await?;
475    Ok(())
476}
477
478#[derive(Deserialize)]
479pub struct Request {
480    #[serde(default)]
481    network: Network,
482    address: String,
483    module: String,
484}
485
486#[derive(Serialize, Deserialize)]
487pub struct SourceResponse {
488    pub source: String,
489}
490
491#[derive(Serialize, Deserialize)]
492pub struct ErrorResponse {
493    pub error: String,
494}
495
496async fn api_route(
497    State(app_state): State<Arc<RwLock<AppState>>>,
498    Query(Request {
499        network,
500        address,
501        module,
502    }): Query<Request>,
503) -> impl IntoResponse {
504    debug!("request network={network}&address={address}&module={module}");
505    let symbol = Symbol::from(module);
506    let Ok(address) = AccountAddress::from_hex_literal(&address) else {
507        let error = format!("Invalid hex address {address}");
508        return (
509            StatusCode::BAD_REQUEST,
510            Json(ErrorResponse { error }).into_response(),
511        );
512    };
513
514    let app_state = app_state.read().unwrap();
515    if let Some(metrics) = &app_state.metrics {
516        metrics.total_requests_received.inc();
517    }
518    let source_result = app_state
519        .sources
520        .get(&network)
521        .and_then(|n| n.get(&address))
522        .and_then(|a| a.get(&symbol));
523    if let Some(SourceInfo {
524        source: Some(source),
525        ..
526    }) = source_result
527    {
528        (
529            StatusCode::OK,
530            Json(SourceResponse {
531                source: source.to_owned(),
532            })
533            .into_response(),
534        )
535    } else {
536        (
537            StatusCode::NOT_FOUND,
538            Json(ErrorResponse {
539                error: format!(
540                    "No source found for {symbol} at address {address} on network {network}"
541                ),
542            })
543            .into_response(),
544        )
545    }
546}
547
548async fn check_version_header(
549    headers: HeaderMap,
550    req: hyper::Request<axum::body::Body>,
551    next: Next,
552) -> Response {
553    let version = headers
554        .get(IOTA_SOURCE_VALIDATION_VERSION_HEADER)
555        .as_ref()
556        .and_then(|h| h.to_str().ok())
557        .map(|s| s.to_string());
558
559    match version {
560        Some(v) if v != IOTA_SOURCE_VALIDATION_VERSION => {
561            let error = format!(
562                "Unsupported version '{v}' specified in header \
563		 {IOTA_SOURCE_VALIDATION_VERSION_HEADER}"
564            );
565            let mut headers = HeaderMap::new();
566            headers.insert(
567                HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
568                HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
569            );
570            return (
571                StatusCode::BAD_REQUEST,
572                headers,
573                Json(ErrorResponse { error }).into_response(),
574            )
575                .into_response();
576        }
577        _ => (),
578    }
579    let mut response = next.run(req).await;
580    response.headers_mut().insert(
581        HeaderName::from_static(IOTA_SOURCE_VALIDATION_VERSION_HEADER),
582        HeaderValue::from_static(IOTA_SOURCE_VALIDATION_VERSION),
583    );
584    response
585}
586
587async fn list_route(State(app_state): State<Arc<RwLock<AppState>>>) -> impl IntoResponse {
588    let app_state = app_state.read().unwrap();
589    (
590        StatusCode::OK,
591        Json(app_state.sources_list.clone()).into_response(),
592    )
593}
594
595pub struct SourceServiceMetrics {
596    pub total_requests_received: IntCounter,
597}
598
599impl SourceServiceMetrics {
600    pub fn new(registry: &Registry) -> Self {
601        Self {
602            total_requests_received: register_int_counter_with_registry!(
603                "total_requests",
604                "Total number of requests received by Source Service",
605                registry
606            )
607            .unwrap(),
608        }
609    }
610}
611
612pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
613    let registry = Registry::new();
614
615    let registry_service = RegistryService::new(registry);
616
617    let app = Router::new()
618        .route(METRICS_ROUTE, get(iota_metrics::metrics))
619        .layer(Extension(registry_service.clone()));
620
621    tokio::spawn(async move {
622        listener.set_nonblocking(true).unwrap();
623        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
624        axum::serve(listener, app).await.unwrap();
625    });
626
627    registry_service
628}