iota_graphql_rpc/consistency.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 filter, query,
11 raw_query::RawQuery,
12 types::{
13 available_range::AvailableRange,
14 cursor::{JsonCursor, Page, ScanLimited},
15 object::Cursor,
16 },
17};
18
19/// The checkpoint sequence number for entities not available for view.
20pub(crate) const UNAVAILABLE_CHECKPOINT_SEQUENCE_NUMBER: u64 = u64::MAX;
21
22#[derive(Copy, Clone)]
23pub(crate) enum View {
24 /// Return objects that fulfill the filtering criteria, even if there are
25 /// more recent versions of the object within the checkpoint range. This
26 /// is used for lookups such as by `object_id` and `version`.
27 Historical,
28 /// Return objects that fulfill the filtering criteria and are the most
29 /// recent version within the checkpoint range.
30 Consistent,
31}
32
33/// The consistent cursor for an index into a `Vec` field is constructed from
34/// the index of the element and the checkpoint the cursor was constructed at.
35#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
36pub(crate) struct ConsistentIndexCursor {
37 #[serde(rename = "i")]
38 pub ix: usize,
39 /// The checkpoint sequence number at which the entity corresponding to this
40 /// cursor was viewed at.
41 pub c: u64,
42}
43
44/// The consistent cursor for an index into a `Map` field is constructed from
45/// the name or key of the element and the checkpoint the cursor was constructed
46/// at.
47#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
48pub(crate) struct ConsistentNamedCursor {
49 #[serde(rename = "n")]
50 pub name: String,
51 /// The checkpoint sequence number at which the entity corresponding to this
52 /// cursor was viewed at.
53 pub c: u64,
54}
55
56/// Trait for cursors that have a checkpoint sequence number associated with
57/// them.
58pub(crate) trait Checkpointed: CursorType {
59 fn checkpoint_viewed_at(&self) -> u64;
60}
61
62impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
63 fn checkpoint_viewed_at(&self) -> u64 {
64 self.c
65 }
66}
67
68impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
69 fn checkpoint_viewed_at(&self) -> u64 {
70 self.c
71 }
72}
73
74impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
75
76impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
77
78/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
79/// table to fetch objects that satisfy some filtering criteria `filter_fn`
80/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
81/// table contains the latest versions of objects up to a checkpoint sequence
82/// number, and `objects_history` captures changes after that, so a query to
83/// both tables is necessary to handle these object states:
84/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
85/// then has not been modified since
86/// 2) In history, not in snapshot - occurs when a new object is created
87/// 3) In snapshot and in history - occurs when an object is snapshotted and
88/// further modified
89///
90/// Additionally, even among objects that satisfy the filtering criteria, it is
91/// possible that there is a yet more recent version of the object within the
92/// checkpoint range, such as when the owner of an object changes. The `LEFT
93/// JOIN` against the `objects_history` table handles this and scenario 3. Note
94/// that the implementation applies the `LEFT JOIN` to each inner query in
95/// conjunction with the `page`'s cursor and limit. If this was instead done
96/// once at the end, the query would be drastically inefficient as we would be
97/// dealing with a large number of rows from `objects_snapshot`, and potentially
98/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
99/// and limit applied on the inner queries work in conjunction to make the final
100/// query noticeably more efficient. The former serves as a filter, and the
101/// latter reduces the number of rows that the database needs to work with.
102///
103/// However, not all queries require this `LEFT JOIN`, such as when no filtering
104/// criteria is specified, or if the filter is a lookup at a specific
105/// `object_id` and `object_version`. This is controlled by the `view`
106/// parameter. If the `view` parameter is set to `Consistent`, this filter
107/// is applied, otherwise if the `view` parameter is set to `Historical`, this
108/// filter is not applied.
109///
110/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
111/// ALL` instead of `UNION`; the latter incurs significant overhead as it
112/// additionally de-duplicates records from both sources. This dedupe is
113/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
114/// ORDER BY object_id, object_version DESC`. This is also redundant for the
115/// most part, due to the invariant that the `objects_history` captures changes
116/// that occur after `objects_snapshot`, but it's a safeguard to handle any
117/// possible overlap during snapshot creation.
118pub(crate) fn build_objects_query(
119 view: View,
120 range: AvailableRange,
121 page: &Page<Cursor>,
122 filter_fn: impl Fn(RawQuery) -> RawQuery,
123 newer_criteria: impl Fn(RawQuery) -> RawQuery,
124) -> RawQuery {
125 let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
126 snapshot_objs_inner = filter_fn(snapshot_objs_inner);
127
128 let mut snapshot_objs = match view {
129 View::Consistent => {
130 // Subquery to be used in `LEFT JOIN` for more recent object versions
131 let newer = newer_criteria(filter!(
132 query!("SELECT object_id, object_version FROM objects_history"),
133 format!(
134 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
135 range.first, range.last
136 )
137 ));
138
139 // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
140 // version
141 let mut snapshot_objs = query!(
142 r#"SELECT candidates.* FROM ({}) candidates
143 LEFT JOIN ({}) newer
144 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
145 snapshot_objs_inner,
146 newer
147 );
148 snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
149 snapshot_objs
150 }
151 View::Historical => {
152 // The cursor pagination logic refers to the table with the `candidates` alias
153 query!(
154 "SELECT candidates.* FROM ({}) candidates",
155 snapshot_objs_inner
156 )
157 }
158 };
159
160 // Always apply cursor pagination and limit to constrain the number of rows
161 // returned, ensure that the inner queries are in step, and to handle the
162 // scenario where a user provides more `objectKeys` than allowed by the
163 // maximum page size.
164 snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
165
166 // Similar to the snapshot query, construct the filtered inner query for the
167 // history table.
168 let mut history_window = query!("SELECT * FROM objects_history");
169 history_window = filter_fn(history_window);
170
171 let mut history_objs = match view {
172 View::Consistent => {
173 // Additionally bound the inner `objects_history` query by the checkpoint range
174 history_window = filter!(
175 history_window,
176 format!(
177 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
178 range.first, range.last
179 )
180 );
181
182 let newest = newer_criteria(filter!(
183 query!("SELECT object_id, MAX(object_version) AS max_version FROM objects_history"),
184 format!(
185 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
186 range.first, range.last
187 )
188 ))
189 .group_by("object_id");
190
191 let history_objs = query!(
192 r#"WITH history_window AS ({}),
193 newest AS ({})
194 SELECT candidates.* FROM history_window candidates
195 JOIN newest
196 ON candidates.object_id = newest.object_id
197 AND candidates.object_version = newest.max_version"#,
198 history_window,
199 newest
200 );
201 history_objs
202 }
203 View::Historical => {
204 // The cursor pagination logic refers to the table with the `candidates` alias
205 query!("SELECT candidates.* FROM ({}) candidates", history_window)
206 }
207 };
208
209 // Always apply cursor pagination and limit to constrain the number of rows
210 // returned, ensure that the inner queries are in step, and to handle the
211 // scenario where a user provides more `objectKeys` than allowed by the
212 // maximum page size.
213 history_objs = page.apply::<StoredHistoryObject>(history_objs);
214
215 // Combine the two queries, and select the most recent version of each object.
216 // The result set is the most recent version of objects from
217 // `objects_snapshot` and `objects_history` that match the filter criteria.
218 let query = query!(
219 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
220 snapshot_objs,
221 history_objs
222 )
223 .order_by("object_id")
224 .order_by("object_version DESC");
225
226 query!("SELECT * FROM ({}) candidates", query)
227}