iota_graphql_rpc/
consistency.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    filter, query,
11    raw_query::RawQuery,
12    types::{
13        available_range::AvailableRange,
14        cursor::{JsonCursor, Page, ScanLimited},
15        object::Cursor,
16    },
17};
18
19/// The checkpoint sequence number for entities not available for view.
20pub(crate) const UNAVAILABLE_CHECKPOINT_SEQUENCE_NUMBER: u64 = u64::MAX;
21
22#[derive(Copy, Clone)]
23pub(crate) enum View {
24    /// Return objects that fulfill the filtering criteria, even if there are
25    /// more recent versions of the object within the checkpoint range. This
26    /// is used for lookups such as by `object_id` and `version`.
27    Historical,
28    /// Return objects that fulfill the filtering criteria and are the most
29    /// recent version within the checkpoint range.
30    Consistent,
31}
32
33/// The consistent cursor for an index into a `Vec` field is constructed from
34/// the index of the element and the checkpoint the cursor was constructed at.
35#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
36pub(crate) struct ConsistentIndexCursor {
37    #[serde(rename = "i")]
38    pub ix: usize,
39    /// The checkpoint sequence number at which the entity corresponding to this
40    /// cursor was viewed at.
41    pub c: u64,
42}
43
44/// The consistent cursor for an index into a `Map` field is constructed from
45/// the name or key of the element and the checkpoint the cursor was constructed
46/// at.
47#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
48pub(crate) struct ConsistentNamedCursor {
49    #[serde(rename = "n")]
50    pub name: String,
51    /// The checkpoint sequence number at which the entity corresponding to this
52    /// cursor was viewed at.
53    pub c: u64,
54}
55
56/// Trait for cursors that have a checkpoint sequence number associated with
57/// them.
58pub(crate) trait Checkpointed: CursorType {
59    fn checkpoint_viewed_at(&self) -> u64;
60}
61
62impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
63    fn checkpoint_viewed_at(&self) -> u64 {
64        self.c
65    }
66}
67
68impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
69    fn checkpoint_viewed_at(&self) -> u64 {
70        self.c
71    }
72}
73
74impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
75
76impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
77
78/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
79/// table to fetch objects that satisfy some filtering criteria `filter_fn`
80/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
81/// table contains the latest versions of objects up to a checkpoint sequence
82/// number, and `objects_history` captures changes after that, so a query to
83/// both tables is necessary to handle these object states:
84/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
85///    then has not been modified since
86/// 2) In history, not in snapshot - occurs when a new object is created
87/// 3) In snapshot and in history - occurs when an object is snapshotted and
88///    further modified
89///
90/// Additionally, even among objects that satisfy the filtering criteria, it is
91/// possible that there is a yet more recent version of the object within the
92/// checkpoint range, such as when the owner of an object changes. The `LEFT
93/// JOIN` against the `objects_history` table handles this and scenario 3. Note
94/// that the implementation applies the `LEFT JOIN` to each inner query in
95/// conjunction with the `page`'s cursor and limit. If this was instead done
96/// once at the end, the query would be drastically inefficient as we would be
97/// dealing with a large number of rows from `objects_snapshot`, and potentially
98/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
99/// and limit applied on the inner queries work in conjunction to make the final
100/// query noticeably more efficient. The former serves as a filter, and the
101/// latter reduces the number of rows that the database needs to work with.
102///
103/// However, not all queries require this `LEFT JOIN`, such as when no filtering
104/// criteria is specified, or if the filter is a lookup at a specific
105/// `object_id` and `object_version`. This is controlled by the `view`
106/// parameter. If the `view` parameter is set to `Consistent`, this filter
107/// is applied, otherwise if the `view` parameter is set to `Historical`, this
108/// filter is not applied.
109///
110/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
111/// ALL` instead of `UNION`; the latter incurs significant overhead as it
112/// additionally de-duplicates records from both sources. This dedupe is
113/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
114/// ORDER BY object_id, object_version DESC`. This is also redundant for the
115/// most part, due to the invariant that the `objects_history` captures changes
116/// that occur after `objects_snapshot`, but it's a safeguard to handle any
117/// possible overlap during snapshot creation.
118pub(crate) fn build_objects_query(
119    view: View,
120    range: AvailableRange,
121    page: &Page<Cursor>,
122    filter_fn: impl Fn(RawQuery) -> RawQuery,
123    newer_criteria: impl Fn(RawQuery) -> RawQuery,
124) -> RawQuery {
125    let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
126    snapshot_objs_inner = filter_fn(snapshot_objs_inner);
127
128    let mut snapshot_objs = match view {
129        View::Consistent => {
130            // Subquery to be used in `LEFT JOIN` for more recent object versions
131            let newer = newer_criteria(filter!(
132                query!("SELECT object_id, object_version FROM objects_history"),
133                format!(
134                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
135                    range.first, range.last
136                )
137            ));
138
139            // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
140            // version
141            let mut snapshot_objs = query!(
142                r#"SELECT candidates.* FROM ({}) candidates
143                    LEFT JOIN ({}) newer
144                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
145                snapshot_objs_inner,
146                newer
147            );
148            snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
149            snapshot_objs
150        }
151        View::Historical => {
152            // The cursor pagination logic refers to the table with the `candidates` alias
153            query!(
154                "SELECT candidates.* FROM ({}) candidates",
155                snapshot_objs_inner
156            )
157        }
158    };
159
160    // Always apply cursor pagination and limit to constrain the number of rows
161    // returned, ensure that the inner queries are in step, and to handle the
162    // scenario where a user provides more `objectKeys` than allowed by the
163    // maximum page size.
164    snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
165
166    // Similar to the snapshot query, construct the filtered inner query for the
167    // history table.
168    let mut history_window = query!("SELECT * FROM objects_history");
169    history_window = filter_fn(history_window);
170
171    let mut history_objs = match view {
172        View::Consistent => {
173            // Additionally bound the inner `objects_history` query by the checkpoint range
174            history_window = filter!(
175                history_window,
176                format!(
177                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
178                    range.first, range.last
179                )
180            );
181
182            let newest = newer_criteria(filter!(
183                query!("SELECT object_id, MAX(object_version) AS max_version FROM objects_history"),
184                format!(
185                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
186                    range.first, range.last
187                )
188            ))
189            .group_by("object_id");
190
191            let history_objs = query!(
192                r#"WITH history_window AS ({}),
193                    newest AS ({})
194                    SELECT candidates.* FROM history_window candidates
195                    JOIN newest
196                    ON candidates.object_id = newest.object_id
197                    AND candidates.object_version = newest.max_version"#,
198                history_window,
199                newest
200            );
201            history_objs
202        }
203        View::Historical => {
204            // The cursor pagination logic refers to the table with the `candidates` alias
205            query!("SELECT candidates.* FROM ({}) candidates", history_window)
206        }
207    };
208
209    // Always apply cursor pagination and limit to constrain the number of rows
210    // returned, ensure that the inner queries are in step, and to handle the
211    // scenario where a user provides more `objectKeys` than allowed by the
212    // maximum page size.
213    history_objs = page.apply::<StoredHistoryObject>(history_objs);
214
215    // Combine the two queries, and select the most recent version of each object.
216    // The result set is the most recent version of objects from
217    // `objects_snapshot` and `objects_history` that match the filter criteria.
218    let query = query!(
219        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
220        snapshot_objs,
221        history_objs
222    )
223    .order_by("object_id")
224    .order_by("object_version DESC");
225
226    query!("SELECT * FROM ({}) candidates", query)
227}