iota_graphql_rpc/types/
available_range.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::*;
6use diesel::{CombineDsl, ExpressionMethods, QueryDsl, QueryResult};
7use iota_indexer::schema::{checkpoints, objects_snapshot};
8
9use crate::{
10    data::{Conn, Db, DbConnection, QueryExecutor},
11    error::Error,
12    types::checkpoint::{Checkpoint, CheckpointId},
13};
14#[derive(Clone, Debug, PartialEq, Eq, Copy)]
15pub(crate) struct AvailableRange {
16    pub first: u64,
17    pub last: u64,
18}
19
20/// Range of checkpoints that the RPC is guaranteed to produce a consistent
21/// response for.
22#[Object]
23impl AvailableRange {
24    async fn first(&self, ctx: &Context<'_>) -> Result<Option<Checkpoint>> {
25        Checkpoint::query(ctx, CheckpointId::by_seq_num(self.first), self.last)
26            .await
27            .extend()
28    }
29
30    async fn last(&self, ctx: &Context<'_>) -> Result<Option<Checkpoint>> {
31        Checkpoint::query(ctx, CheckpointId::by_seq_num(self.last), self.last)
32            .await
33            .extend()
34    }
35}
36
37impl AvailableRange {
38    /// Look up the available range when viewing the data consistently at
39    /// `checkpoint_viewed_at`.
40    pub(crate) async fn query(db: &Db, checkpoint_viewed_at: u64) -> Result<Self, Error> {
41        let Some(range): Option<Self> = db
42            .execute(move |conn| Self::result(conn, checkpoint_viewed_at))
43            .await
44            .map_err(|e| Error::Internal(format!("Failed to fetch available range: {e}")))?
45        else {
46            return Err(Error::Client(format!(
47                "Requesting data at checkpoint {checkpoint_viewed_at}, outside the available \
48                 range.",
49            )));
50        };
51
52        Ok(range)
53    }
54
55    /// Look up the available range when viewing the data consistently at
56    /// `checkpoint_viewed_at`. Made available on the `Conn` type to make it
57    /// easier to call as part of other queries.
58    ///
59    /// Returns an error if there was an issue querying the database, Ok(None)
60    /// if the checkpoint being viewed is not in the database's available
61    /// range, or Ok(Some(AvailableRange)) otherwise.
62    pub(crate) fn result(conn: &mut Conn, checkpoint_viewed_at: u64) -> QueryResult<Option<Self>> {
63        use checkpoints::dsl as checkpoints;
64        use objects_snapshot::dsl as snapshots;
65
66        let checkpoint_range: Vec<i64> = conn.results(move || {
67            let rhs = checkpoints::checkpoints
68                .select(checkpoints::sequence_number)
69                .order(checkpoints::sequence_number.desc())
70                .limit(1);
71
72            let lhs = snapshots::objects_snapshot
73                .select(snapshots::checkpoint_sequence_number)
74                .order(snapshots::checkpoint_sequence_number.desc())
75                .limit(1);
76            // We need to use `union_all` in case `lhs` and `rhs` have the same value.
77            lhs.union_all(rhs)
78        })?;
79
80        let (first, mut last) = match checkpoint_range.as_slice() {
81            [] => (0, 0),
82            [single_value] => (0, *single_value as u64),
83            values => {
84                let min_value = *values.iter().min().unwrap();
85                let max_value = *values.iter().max().unwrap();
86                (min_value as u64, max_value as u64)
87            }
88        };
89
90        if checkpoint_viewed_at < first || last < checkpoint_viewed_at {
91            return Ok(None);
92        }
93
94        last = checkpoint_viewed_at;
95        Ok(Some(Self { first, last }))
96    }
97}