iota_graphql_rpc/
consistency.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    filter, query,
11    raw_query::RawQuery,
12    types::{
13        available_range::AvailableRange,
14        cursor::{JsonCursor, Page, ScanLimited},
15        object::Cursor,
16    },
17};
18
19#[derive(Copy, Clone)]
20pub(crate) enum View {
21    /// Return objects that fulfill the filtering criteria, even if there are
22    /// more recent versions of the object within the checkpoint range. This
23    /// is used for lookups such as by `object_id` and `version`.
24    Historical,
25    /// Return objects that fulfill the filtering criteria and are the most
26    /// recent version within the checkpoint range.
27    Consistent,
28}
29
30/// The consistent cursor for an index into a `Vec` field is constructed from
31/// the index of the element and the checkpoint the cursor was constructed at.
32#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
33pub(crate) struct ConsistentIndexCursor {
34    #[serde(rename = "i")]
35    pub ix: usize,
36    /// The checkpoint sequence number at which the entity corresponding to this
37    /// cursor was viewed at.
38    pub c: u64,
39}
40
41/// The consistent cursor for an index into a `Map` field is constructed from
42/// the name or key of the element and the checkpoint the cursor was constructed
43/// at.
44#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
45pub(crate) struct ConsistentNamedCursor {
46    #[serde(rename = "n")]
47    pub name: String,
48    /// The checkpoint sequence number at which the entity corresponding to this
49    /// cursor was viewed at.
50    pub c: u64,
51}
52
53/// Trait for cursors that have a checkpoint sequence number associated with
54/// them.
55pub(crate) trait Checkpointed: CursorType {
56    fn checkpoint_viewed_at(&self) -> u64;
57}
58
59impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
60    fn checkpoint_viewed_at(&self) -> u64 {
61        self.c
62    }
63}
64
65impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
66    fn checkpoint_viewed_at(&self) -> u64 {
67        self.c
68    }
69}
70
71impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
72
73impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
74
75/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
76/// table to fetch objects that satisfy some filtering criteria `filter_fn`
77/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
78/// table contains the latest versions of objects up to a checkpoint sequence
79/// number, and `objects_history` captures changes after that, so a query to
80/// both tables is necessary to handle these object states:
81/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
82///    then has not been modified since
83/// 2) In history, not in snapshot - occurs when a new object is created
84/// 3) In snapshot and in history - occurs when an object is snapshotted and
85///    further modified
86///
87/// Additionally, even among objects that satisfy the filtering criteria, it is
88/// possible that there is a yet more recent version of the object within the
89/// checkpoint range, such as when the owner of an object changes. The `LEFT
90/// JOIN` against the `objects_history` table handles this and scenario 3. Note
91/// that the implementation applies the `LEFT JOIN` to each inner query in
92/// conjunction with the `page`'s cursor and limit. If this was instead done
93/// once at the end, the query would be drastically inefficient as we would be
94/// dealing with a large number of rows from `objects_snapshot`, and potentially
95/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
96/// and limit applied on the inner queries work in conjunction to make the final
97/// query noticeably more efficient. The former serves as a filter, and the
98/// latter reduces the number of rows that the database needs to work with.
99///
100/// However, not all queries require this `LEFT JOIN`, such as when no filtering
101/// criteria is specified, or if the filter is a lookup at a specific
102/// `object_id` and `object_version`. This is controlled by the `view`
103/// parameter. If the `view` parameter is set to `Consistent`, this filter
104/// is applied, otherwise if the `view` parameter is set to `Historical`, this
105/// filter is not applied.
106///
107/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
108/// ALL` instead of `UNION`; the latter incurs significant overhead as it
109/// additionally de-duplicates records from both sources. This dedupe is
110/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
111/// ORDER BY object_id, object_version DESC`. This is also redundant for the
112/// most part, due to the invariant that the `objects_history` captures changes
113/// that occur after `objects_snapshot`, but it's a safeguard to handle any
114/// possible overlap during snapshot creation.
115pub(crate) fn build_objects_query(
116    view: View,
117    range: AvailableRange,
118    page: &Page<Cursor>,
119    filter_fn: impl Fn(RawQuery) -> RawQuery,
120    newer_criteria: impl Fn(RawQuery) -> RawQuery,
121) -> RawQuery {
122    // Subquery to be used in `LEFT JOIN` against the inner queries for more recent
123    // object versions
124    let newer = newer_criteria(filter!(
125        query!("SELECT object_id, object_version FROM objects_history"),
126        format!(
127            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
128            range.first, range.last
129        )
130    ));
131
132    let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
133    snapshot_objs_inner = filter_fn(snapshot_objs_inner);
134
135    let mut snapshot_objs = match view {
136        View::Consistent => {
137            // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
138            // version
139            let mut snapshot_objs = query!(
140                r#"SELECT candidates.* FROM ({}) candidates
141                    LEFT JOIN ({}) newer
142                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
143                snapshot_objs_inner,
144                newer.clone()
145            );
146            snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
147            snapshot_objs
148        }
149        View::Historical => {
150            // The cursor pagination logic refers to the table with the `candidates` alias
151            query!(
152                "SELECT candidates.* FROM ({}) candidates",
153                snapshot_objs_inner
154            )
155        }
156    };
157
158    // Always apply cursor pagination and limit to constrain the number of rows
159    // returned, ensure that the inner queries are in step, and to handle the
160    // scenario where a user provides more `objectKeys` than allowed by the
161    // maximum page size.
162    snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
163
164    // Similar to the snapshot query, construct the filtered inner query for the
165    // history table.
166    let mut history_objs_inner = query!("SELECT * FROM objects_history");
167    history_objs_inner = filter_fn(history_objs_inner);
168
169    let mut history_objs = match view {
170        View::Consistent => {
171            // Additionally bound the inner `objects_history` query by the checkpoint range
172            history_objs_inner = filter!(
173                history_objs_inner,
174                format!(
175                    r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
176                    range.first, range.last
177                )
178            );
179
180            let mut history_objs = query!(
181                r#"SELECT candidates.* FROM ({}) candidates
182                    LEFT JOIN ({}) newer
183                    ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
184                history_objs_inner,
185                newer
186            );
187            history_objs = filter!(history_objs, "newer.object_version IS NULL");
188            history_objs
189        }
190        View::Historical => {
191            // The cursor pagination logic refers to the table with the `candidates` alias
192            query!(
193                "SELECT candidates.* FROM ({}) candidates",
194                history_objs_inner
195            )
196        }
197    };
198
199    // Always apply cursor pagination and limit to constrain the number of rows
200    // returned, ensure that the inner queries are in step, and to handle the
201    // scenario where a user provides more `objectKeys` than allowed by the
202    // maximum page size.
203    history_objs = page.apply::<StoredHistoryObject>(history_objs);
204
205    // Combine the two queries, and select the most recent version of each object.
206    // The result set is the most recent version of objects from
207    // `objects_snapshot` and `objects_history` that match the filter criteria.
208    let query = query!(
209        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
210        snapshot_objs,
211        history_objs
212    )
213    .order_by("object_id")
214    .order_by("object_version DESC");
215
216    query!("SELECT * FROM ({}) candidates", query)
217}