iota_graphql_rpc/consistency.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use async_graphql::connection::CursorType;
6use iota_indexer::models::objects::StoredHistoryObject;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 filter, query,
11 raw_query::RawQuery,
12 types::{
13 available_range::AvailableRange,
14 cursor::{JsonCursor, Page, ScanLimited},
15 object::Cursor,
16 },
17};
18
19#[derive(Copy, Clone)]
20pub(crate) enum View {
21 /// Return objects that fulfill the filtering criteria, even if there are
22 /// more recent versions of the object within the checkpoint range. This
23 /// is used for lookups such as by `object_id` and `version`.
24 Historical,
25 /// Return objects that fulfill the filtering criteria and are the most
26 /// recent version within the checkpoint range.
27 Consistent,
28}
29
30/// The consistent cursor for an index into a `Vec` field is constructed from
31/// the index of the element and the checkpoint the cursor was constructed at.
32#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
33pub(crate) struct ConsistentIndexCursor {
34 #[serde(rename = "i")]
35 pub ix: usize,
36 /// The checkpoint sequence number at which the entity corresponding to this
37 /// cursor was viewed at.
38 pub c: u64,
39}
40
41/// The consistent cursor for an index into a `Map` field is constructed from
42/// the name or key of the element and the checkpoint the cursor was constructed
43/// at.
44#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
45pub(crate) struct ConsistentNamedCursor {
46 #[serde(rename = "n")]
47 pub name: String,
48 /// The checkpoint sequence number at which the entity corresponding to this
49 /// cursor was viewed at.
50 pub c: u64,
51}
52
53/// Trait for cursors that have a checkpoint sequence number associated with
54/// them.
55pub(crate) trait Checkpointed: CursorType {
56 fn checkpoint_viewed_at(&self) -> u64;
57}
58
59impl Checkpointed for JsonCursor<ConsistentIndexCursor> {
60 fn checkpoint_viewed_at(&self) -> u64 {
61 self.c
62 }
63}
64
65impl Checkpointed for JsonCursor<ConsistentNamedCursor> {
66 fn checkpoint_viewed_at(&self) -> u64 {
67 self.c
68 }
69}
70
71impl ScanLimited for JsonCursor<ConsistentIndexCursor> {}
72
73impl ScanLimited for JsonCursor<ConsistentNamedCursor> {}
74
75/// Constructs a `RawQuery` against the `objects_snapshot` and `objects_history`
76/// table to fetch objects that satisfy some filtering criteria `filter_fn`
77/// within the provided checkpoint range `lhs` and `rhs`. The `objects_snapshot`
78/// table contains the latest versions of objects up to a checkpoint sequence
79/// number, and `objects_history` captures changes after that, so a query to
80/// both tables is necessary to handle these object states:
81/// 1) In snapshot, not in history - occurs when an object gets snapshotted and
82/// then has not been modified since
83/// 2) In history, not in snapshot - occurs when a new object is created
84/// 3) In snapshot and in history - occurs when an object is snapshotted and
85/// further modified
86///
87/// Additionally, even among objects that satisfy the filtering criteria, it is
88/// possible that there is a yet more recent version of the object within the
89/// checkpoint range, such as when the owner of an object changes. The `LEFT
90/// JOIN` against the `objects_history` table handles this and scenario 3. Note
91/// that the implementation applies the `LEFT JOIN` to each inner query in
92/// conjunction with the `page`'s cursor and limit. If this was instead done
93/// once at the end, the query would be drastically inefficient as we would be
94/// dealing with a large number of rows from `objects_snapshot`, and potentially
95/// `objects_history` as the checkpoint range grows. Instead, the `LEFT JOIN`
96/// and limit applied on the inner queries work in conjunction to make the final
97/// query noticeably more efficient. The former serves as a filter, and the
98/// latter reduces the number of rows that the database needs to work with.
99///
100/// However, not all queries require this `LEFT JOIN`, such as when no filtering
101/// criteria is specified, or if the filter is a lookup at a specific
102/// `object_id` and `object_version`. This is controlled by the `view`
103/// parameter. If the `view` parameter is set to `Consistent`, this filter
104/// is applied, otherwise if the `view` parameter is set to `Historical`, this
105/// filter is not applied.
106///
107/// Finally, the two queries are merged together with `UNION ALL`. We use `UNION
108/// ALL` instead of `UNION`; the latter incurs significant overhead as it
109/// additionally de-duplicates records from both sources. This dedupe is
110/// unnecessary, since we have the fragment `SELECT DISTINCT ON (object_id) ...
111/// ORDER BY object_id, object_version DESC`. This is also redundant for the
112/// most part, due to the invariant that the `objects_history` captures changes
113/// that occur after `objects_snapshot`, but it's a safeguard to handle any
114/// possible overlap during snapshot creation.
115pub(crate) fn build_objects_query(
116 view: View,
117 range: AvailableRange,
118 page: &Page<Cursor>,
119 filter_fn: impl Fn(RawQuery) -> RawQuery,
120 newer_criteria: impl Fn(RawQuery) -> RawQuery,
121) -> RawQuery {
122 // Subquery to be used in `LEFT JOIN` against the inner queries for more recent
123 // object versions
124 let newer = newer_criteria(filter!(
125 query!("SELECT object_id, object_version FROM objects_history"),
126 format!(
127 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
128 range.first, range.last
129 )
130 ));
131
132 let mut snapshot_objs_inner = query!("SELECT * FROM objects_snapshot");
133 snapshot_objs_inner = filter_fn(snapshot_objs_inner);
134
135 let mut snapshot_objs = match view {
136 View::Consistent => {
137 // The `LEFT JOIN` serves as a filter to remove objects that have a more recent
138 // version
139 let mut snapshot_objs = query!(
140 r#"SELECT candidates.* FROM ({}) candidates
141 LEFT JOIN ({}) newer
142 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
143 snapshot_objs_inner,
144 newer.clone()
145 );
146 snapshot_objs = filter!(snapshot_objs, "newer.object_version IS NULL");
147 snapshot_objs
148 }
149 View::Historical => {
150 // The cursor pagination logic refers to the table with the `candidates` alias
151 query!(
152 "SELECT candidates.* FROM ({}) candidates",
153 snapshot_objs_inner
154 )
155 }
156 };
157
158 // Always apply cursor pagination and limit to constrain the number of rows
159 // returned, ensure that the inner queries are in step, and to handle the
160 // scenario where a user provides more `objectKeys` than allowed by the
161 // maximum page size.
162 snapshot_objs = page.apply::<StoredHistoryObject>(snapshot_objs);
163
164 // Similar to the snapshot query, construct the filtered inner query for the
165 // history table.
166 let mut history_objs_inner = query!("SELECT * FROM objects_history");
167 history_objs_inner = filter_fn(history_objs_inner);
168
169 let mut history_objs = match view {
170 View::Consistent => {
171 // Additionally bound the inner `objects_history` query by the checkpoint range
172 history_objs_inner = filter!(
173 history_objs_inner,
174 format!(
175 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
176 range.first, range.last
177 )
178 );
179
180 let mut history_objs = query!(
181 r#"SELECT candidates.* FROM ({}) candidates
182 LEFT JOIN ({}) newer
183 ON (candidates.object_id = newer.object_id AND candidates.object_version < newer.object_version)"#,
184 history_objs_inner,
185 newer
186 );
187 history_objs = filter!(history_objs, "newer.object_version IS NULL");
188 history_objs
189 }
190 View::Historical => {
191 // The cursor pagination logic refers to the table with the `candidates` alias
192 query!(
193 "SELECT candidates.* FROM ({}) candidates",
194 history_objs_inner
195 )
196 }
197 };
198
199 // Always apply cursor pagination and limit to constrain the number of rows
200 // returned, ensure that the inner queries are in step, and to handle the
201 // scenario where a user provides more `objectKeys` than allowed by the
202 // maximum page size.
203 history_objs = page.apply::<StoredHistoryObject>(history_objs);
204
205 // Combine the two queries, and select the most recent version of each object.
206 // The result set is the most recent version of objects from
207 // `objects_snapshot` and `objects_history` that match the filter criteria.
208 let query = query!(
209 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) candidates"#,
210 snapshot_objs,
211 history_objs
212 )
213 .order_by("object_id")
214 .order_by("object_version DESC");
215
216 query!("SELECT * FROM ({}) candidates", query)
217}