iota_graphql_rpc/consistency.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 filter, query,
11 raw_query::RawQuery,
12 types::{
13 available_range::AvailableRange,
14 cursor::{JsonCursor, Page, ScanLimited},
15 object::Cursor,
16 },
17};
18
19#[derive(Copy, Clone)]
20pub(crate) enum View {
21 /// Return objects that fulfill the filtering criteria, even if there are
22 /// more recent versions of the object within the checkpoint range. This
23 /// is used for lookups such as by `object_id` and `version`.
24 Historical,
25 /// Return objects that fulfill the filtering criteria and are the most
26 /// recent version within the checkpoint range.
27 Consistent,
28}
29
30/// The consistent cursor for an index into a `Vec` field is constructed from
31/// the index of the element and the checkpoint the cursor was constructed at.
32#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
33pub(crate) struct ConsistentIndexCursor {
34 #[serde(rename = "i")]
35 pub ix: usize,
36 /// The checkpoint sequence number at which the entity corresponding to this
37 /// cursor was viewed at.
38 pub c: u64,
39}
40
41/// The consistent cursor for an index into a `Map` field is constructed from
42/// the name or key of the element and the checkpoint the cursor was constructed
43/// at.
44#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
45pub(crate) struct ConsistentNamedCursor {
46 #[serde(rename = "n")]
47 pub name: String,
48 /// The checkpoint sequence number at which the entity corresponding to this
49 /// cursor was viewed at.
50 pub c: u64,
51}
52
53/// Trait for cursors that have a checkpoint sequence number associated with
54/// them.
55pub(crate) trait Checkpointed: CursorType {
56 fn checkpoint_viewed_at(&self) -> u64;
57}
58
59impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
60 fn checkpoint_viewed_at(&self) -> u64 {
61 self.c
62 }
63}
64
65impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
66 fn checkpoint_viewed_at(&self) -> u64 {
67 self.c
68 }
69}
70
71impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
72
73impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
74
75/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
76/// table to fetch objects that satisfy some filtering criteria `filter_fn`
77/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
78/// table contains the latest versions of objects up to a checkpoint sequence
79/// number, and `objects_history` captures changes after that, so a query to
80/// both tables is necessary to handle these object states:
81/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
82/// then has not been modified since
83/// 2) In history, not in snapshot - occurs when a new object is created
84/// 3) In snapshot and in history - occurs when an object is snapshotted and
85/// further modified
86///
87/// Additionally, even among objects that satisfy the filtering criteria, it is
88/// possible that there is a yet more recent version of the object within the
89/// checkpoint range, such as when the owner of an object changes. The `LEFT
90/// JOIN` against the `objects_history` table handles this and scenario 3. Note
91/// that the implementation applies the `LEFT JOIN` to each inner query in
92/// conjunction with the `page`'s cursor and limit. If this was instead done
93/// once at the end, the query would be drastically inefficient as we would be
94/// dealing with a large number of rows from `objects_snapshot`, and potentially
95/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
96/// and limit applied on the inner queries work in conjunction to make the final
97/// query noticeably more efficient. The former serves as a filter, and the
98/// latter reduces the number of rows that the database needs to work with.
99///
100/// However, not all queries require this `LEFT JOIN`, such as when no filtering
101/// criteria is specified, or if the filter is a lookup at a specific
102/// `object_id` and `object_version`. This is controlled by the `view`
103/// parameter. If the `view` parameter is set to `Consistent`, this filter
104/// is applied, otherwise if the `view` parameter is set to `Historical`, this
105/// filter is not applied.
106///
107/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
108/// ALL` instead of `UNION`; the latter incurs significant overhead as it
109/// additionally de-duplicates records from both sources. This dedupe is
110/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
111/// ORDER BY object_id, object_version DESC`. This is also redundant for the
112/// most part, due to the invariant that the `objects_history` captures changes
113/// that occur after `objects_snapshot`, but it's a safeguard to handle any
114/// possible overlap during snapshot creation.
115pub(crate) fn build_objects_query(
116 view: View,
117 range: AvailableRange,
118 page: &Page<Cursor>,
119 filter_fn: impl Fn(RawQuery) -> RawQuery,
120 newer_criteria: impl Fn(RawQuery) -> RawQuery,
121) -> RawQuery {
122 let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
123 snapshot_objs_inner = filter_fn(snapshot_objs_inner);
124
125 let mut snapshot_objs = match view {
126 View::Consistent => {
127 // Subquery to be used in `LEFT JOIN` for more recent object versions
128 let newer = newer_criteria(filter!(
129 query!("SELECT object_id, object_version FROM objects_history"),
130 format!(
131 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
132 range.first, range.last
133 )
134 ));
135
136 // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
137 // version
138 let mut snapshot_objs = query!(
139 r#"SELECT candidates.* FROM ({}) candidates
140 LEFT JOIN ({}) newer
141 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
142 snapshot_objs_inner,
143 newer.clone()
144 );
145 snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
146 snapshot_objs
147 }
148 View::Historical => {
149 // The cursor pagination logic refers to the table with the `candidates` alias
150 query!(
151 "SELECT candidates.* FROM ({}) candidates",
152 snapshot_objs_inner
153 )
154 }
155 };
156
157 // Always apply cursor pagination and limit to constrain the number of rows
158 // returned, ensure that the inner queries are in step, and to handle the
159 // scenario where a user provides more `objectKeys` than allowed by the
160 // maximum page size.
161 snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
162
163 // Similar to the snapshot query, construct the filtered inner query for the
164 // history table.
165 let mut history_window = query!("SELECT * FROM objects_history");
166 history_window = filter_fn(history_window);
167
168 let mut history_objs = match view {
169 View::Consistent => {
170 // Additionally bound the inner `objects_history` query by the checkpoint range
171 history_window = filter!(
172 history_window,
173 format!(
174 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
175 range.first, range.last
176 )
177 );
178
179 let newest = newer_criteria(filter!(
180 query!("SELECT object_id, MAX(object_version) AS max_version FROM objects_history"),
181 format!(
182 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
183 range.first, range.last
184 )
185 ))
186 .group_by("object_id");
187
188 let history_objs = query!(
189 r#"WITH history_window AS ({}),
190 newest AS ({})
191 SELECT candidates.* FROM history_window candidates
192 JOIN newest
193 ON candidates.object_id = newest.object_id
194 AND candidates.object_version = newest.max_version"#,
195 history_window,
196 newest
197 );
198 history_objs
199 }
200 View::Historical => {
201 // The cursor pagination logic refers to the table with the `candidates` alias
202 query!("SELECT candidates.* FROM ({}) candidates", history_window)
203 }
204 };
205
206 // Always apply cursor pagination and limit to constrain the number of rows
207 // returned, ensure that the inner queries are in step, and to handle the
208 // scenario where a user provides more `objectKeys` than allowed by the
209 // maximum page size.
210 history_objs = page.apply::<StoredHistoryObject>(history_objs);
211
212 // Combine the two queries, and select the most recent version of each object.
213 // The result set is the most recent version of objects from
214 // `objects_snapshot` and `objects_history` that match the filter criteria.
215 let query = query!(
216 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
217 snapshot_objs,
218 history_objs
219 )
220 .order_by("object_id")
221 .order_by("object_version DESC");
222
223 query!("SELECT * FROM ({}) candidates", query)
224}