iota_graphql_rpc/
consistency.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    filter, query,
11    raw_query::RawQuery,
12    types::{
13        available_range::AvailableRange,
14        cursor::{JsonCursor, Page, ScanLimited},
15        object::Cursor,
16    },
17};
18
19#[derive(Copy, Clone)]
20pub(crate) enum View {
21    /// Return objects that fulfill the filtering criteria, even if there are
22    /// more recent versions of the object within the checkpoint range. This
23    /// is used for lookups such as by `object_id` and `version`.
24    Historical,
25    /// Return objects that fulfill the filtering criteria and are the most
26    /// recent version within the checkpoint range.
27    Consistent,
28}
29
30/// The consistent cursor for an index into a `Vec` field is constructed from
31/// the index of the element and the checkpoint the cursor was constructed at.
32#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
33pub(crate) struct ConsistentIndexCursor {
34    #[serde(rename = "i")]
35    pub ix: usize,
36    /// The checkpoint sequence number at which the entity corresponding to this
37    /// cursor was viewed at.
38    pub c: u64,
39}
40
41/// The consistent cursor for an index into a `Map` field is constructed from
42/// the name or key of the element and the checkpoint the cursor was constructed
43/// at.
44#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
45pub(crate) struct ConsistentNamedCursor {
46    #[serde(rename = "n")]
47    pub name: String,
48    /// The checkpoint sequence number at which the entity corresponding to this
49    /// cursor was viewed at.
50    pub c: u64,
51}
52
53/// Trait for cursors that have a checkpoint sequence number associated with
54/// them.
55pub(crate) trait Checkpointed: CursorType {
56    fn checkpoint_viewed_at(&self) -> u64;
57}
58
59impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
60    fn checkpoint_viewed_at(&self) -> u64 {
61        self.c
62    }
63}
64
65impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
66    fn checkpoint_viewed_at(&self) -> u64 {
67        self.c
68    }
69}
70
71impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
72
73impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
74
75/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
76/// table to fetch objects that satisfy some filtering criteria `filter_fn`
77/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
78/// table contains the latest versions of objects up to a checkpoint sequence
79/// number, and `objects_history` captures changes after that, so a query to
80/// both tables is necessary to handle these object states:
81/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
82///    then has not been modified since
83/// 2) In history, not in snapshot - occurs when a new object is created
84/// 3) In snapshot and in history - occurs when an object is snapshotted and
85///    further modified
86///
87/// Additionally, even among objects that satisfy the filtering criteria, it is
88/// possible that there is a yet more recent version of the object within the
89/// checkpoint range, such as when the owner of an object changes. The `LEFT
90/// JOIN` against the `objects_history` table handles this and scenario 3. Note
91/// that the implementation applies the `LEFT JOIN` to each inner query in
92/// conjunction with the `page`'s cursor and limit. If this was instead done
93/// once at the end, the query would be drastically inefficient as we would be
94/// dealing with a large number of rows from `objects_snapshot`, and potentially
95/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
96/// and limit applied on the inner queries work in conjunction to make the final
97/// query noticeably more efficient. The former serves as a filter, and the
98/// latter reduces the number of rows that the database needs to work with.
99///
100/// However, not all queries require this `LEFT JOIN`, such as when no filtering
101/// criteria is specified, or if the filter is a lookup at a specific
102/// `object_id` and `object_version`. This is controlled by the `view`
103/// parameter. If the `view` parameter is set to `Consistent`, this filter
104/// is applied, otherwise if the `view` parameter is set to `Historical`, this
105/// filter is not applied.
106///
107/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
108/// ALL` instead of `UNION`; the latter incurs significant overhead as it
109/// additionally de-duplicates records from both sources. This dedupe is
110/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
111/// ORDER BY object_id, object_version DESC`. This is also redundant for the
112/// most part, due to the invariant that the `objects_history` captures changes
113/// that occur after `objects_snapshot`, but it's a safeguard to handle any
114/// possible overlap during snapshot creation.
115pub(crate) fn build_objects_query(
116    view: View,
117    range: AvailableRange,
118    page: &Page<Cursor>,
119    filter_fn: impl Fn(RawQuery) -> RawQuery,
120    newer_criteria: impl Fn(RawQuery) -> RawQuery,
121) -> RawQuery {
122    let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
123    snapshot_objs_inner = filter_fn(snapshot_objs_inner);
124
125    let mut snapshot_objs = match view {
126        View::Consistent => {
127            // Subquery to be used in `LEFT JOIN` for more recent object versions
128            let newer = newer_criteria(filter!(
129                query!("SELECT object_id, object_version FROM objects_history"),
130                format!(
131                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
132                    range.first, range.last
133                )
134            ));
135
136            // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
137            // version
138            let mut snapshot_objs = query!(
139                r#"SELECT candidates.* FROM ({}) candidates
140                    LEFT JOIN ({}) newer
141                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
142                snapshot_objs_inner,
143                newer.clone()
144            );
145            snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
146            snapshot_objs
147        }
148        View::Historical => {
149            // The cursor pagination logic refers to the table with the `candidates` alias
150            query!(
151                "SELECT candidates.* FROM ({}) candidates",
152                snapshot_objs_inner
153            )
154        }
155    };
156
157    // Always apply cursor pagination and limit to constrain the number of rows
158    // returned, ensure that the inner queries are in step, and to handle the
159    // scenario where a user provides more `objectKeys` than allowed by the
160    // maximum page size.
161    snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
162
163    // Similar to the snapshot query, construct the filtered inner query for the
164    // history table.
165    let mut history_window = query!("SELECT * FROM objects_history");
166    history_window = filter_fn(history_window);
167
168    let mut history_objs = match view {
169        View::Consistent => {
170            // Additionally bound the inner `objects_history` query by the checkpoint range
171            history_window = filter!(
172                history_window,
173                format!(
174                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
175                    range.first, range.last
176                )
177            );
178
179            let newest = newer_criteria(filter!(
180                query!("SELECT object_id, MAX(object_version) AS max_version FROM objects_history"),
181                format!(
182                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
183                    range.first, range.last
184                )
185            ))
186            .group_by("object_id");
187
188            let history_objs = query!(
189                r#"WITH history_window AS ({}),
190                    newest AS ({})
191                    SELECT candidates.* FROM history_window candidates
192                    JOIN newest
193                    ON candidates.object_id = newest.object_id
194                    AND candidates.object_version = newest.max_version"#,
195                history_window,
196                newest
197            );
198            history_objs
199        }
200        View::Historical => {
201            // The cursor pagination logic refers to the table with the `candidates` alias
202            query!("SELECT candidates.* FROM ({}) candidates", history_window)
203        }
204    };
205
206    // Always apply cursor pagination and limit to constrain the number of rows
207    // returned, ensure that the inner queries are in step, and to handle the
208    // scenario where a user provides more `objectKeys` than allowed by the
209    // maximum page size.
210    history_objs = page.apply::<StoredHistoryObject>(history_objs);
211
212    // Combine the two queries, and select the most recent version of each object.
213    // The result set is the most recent version of objects from
214    // `objects_snapshot` and `objects_history` that match the filter criteria.
215    let query = query!(
216        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
217        snapshot_objs,
218        history_objs
219    )
220    .order_by("object_id")
221    .order_by("object_version DESC");
222
223    query!("SELECT * FROM ({}) candidates", query)
224}