1use std::{fmt, ops::Deref, vec};
6
7use async_graphql::{
8 connection::{CursorType, OpaqueCursor},
9 *,
10};
11use diesel::{
12 QueryDsl, QueryResult, QuerySource, deserialize::FromSqlRow, query_builder::QueryFragment,
13 query_dsl::LoadQuery, sql_types::Untyped,
14};
15use fastcrypto::encoding::{Base64, Encoding};
16use serde::{Serialize, de::DeserializeOwned};
17
18use crate::{
19 config::ServiceConfig,
20 consistency::{Checkpointed, ConsistentIndexCursor},
21 data::{Conn, DbConnection, DieselBackend, DieselConn, Query},
22 error::Error,
23 raw_query::RawQuery,
24};
25
26pub(crate) struct JsonCursor<C>(OpaqueCursor<C>);
28
29pub(crate) struct BcsCursor<C>(C);
31
32#[derive(Debug, Clone)]
35pub(crate) struct Page<C> {
36 after: Option<C>,
39
40 before: Option<C>,
43
44 limit: u64,
46
47 end: End,
51}
52
53#[derive(PartialEq, Eq, Debug, Clone, Copy)]
56pub(crate) enum End {
57 Front,
58 Back,
59}
60
61pub(crate) trait Paginated<C: CursorType>: Target<C> {
63 type Source: QuerySource;
64
65 fn filter_ge<ST, GB>(
68 cursor: &C,
69 query: Query<ST, Self::Source, GB>,
70 ) -> Query<ST, Self::Source, GB>;
71
72 fn filter_le<ST, GB>(
75 cursor: &C,
76 query: Query<ST, Self::Source, GB>,
77 ) -> Query<ST, Self::Source, GB>;
78
79 fn order<ST, GB>(asc: bool, query: Query<ST, Self::Source, GB>) -> Query<ST, Self::Source, GB>;
84}
85
86pub(crate) trait RawPaginated<C: CursorType>: Target<C> {
89 fn filter_ge(cursor: &C, query: RawQuery) -> RawQuery;
92
93 fn filter_le(cursor: &C, query: RawQuery) -> RawQuery;
96
97 fn order(asc: bool, query: RawQuery) -> RawQuery;
102}
103
104pub(crate) trait Target<C: CursorType> {
105 fn cursor(&self, checkpoint_viewed_at: u64) -> C;
108}
109
110pub(crate) trait ScanLimited: Clone + PartialEq {
112 fn is_scan_limited(&self) -> bool {
117 false
118 }
119
120 fn unlimited(&self) -> Self {
122 self.clone()
123 }
124}
125
126impl<C> JsonCursor<C> {
127 pub(crate) fn new(cursor: C) -> Self {
128 JsonCursor(OpaqueCursor(cursor))
129 }
130}
131
132impl<C> BcsCursor<C> {
133 pub(crate) fn new(cursor: C) -> Self {
134 BcsCursor(cursor)
135 }
136}
137
138impl<C> Page<C> {
139 pub(crate) fn from_params(
155 config: &ServiceConfig,
156 first: Option<u64>,
157 after: Option<C>,
158 last: Option<u64>,
159 before: Option<C>,
160 ) -> Result<Self> {
161 let limits = &config.limits;
162 let page = match (first, after, last, before) {
163 (Some(_), _, Some(_), _) => return Err(Error::CursorNoFirstLast.extend()),
164
165 (limit, after, None, before) => Page {
166 after,
167 before,
168 limit: limit.unwrap_or(limits.default_page_size as u64),
169 end: End::Front,
170 },
171
172 (None, after, Some(limit), before) => Page {
173 after,
174 before,
175 limit,
176 end: End::Back,
177 },
178 };
179
180 if page.limit > limits.max_page_size as u64 {
181 return Err(Error::PageTooLarge(page.limit, limits.max_page_size).extend());
182 }
183
184 Ok(page)
185 }
186
187 pub(crate) fn bounded(limit: u64) -> Self {
190 Page {
191 after: None,
192 before: None,
193 limit,
194 end: End::Front,
195 }
196 }
197
198 pub(crate) fn after(&self) -> Option<&C> {
199 self.after.as_ref()
200 }
201
202 pub(crate) fn before(&self) -> Option<&C> {
203 self.before.as_ref()
204 }
205
206 pub(crate) fn limit(&self) -> usize {
207 self.limit as usize
208 }
209
210 pub(crate) fn is_from_front(&self) -> bool {
211 matches!(self.end, End::Front)
212 }
213
214 pub(crate) fn end(&self) -> End {
215 self.end
216 }
217}
218
219impl<C> Page<C>
220where
221 C: Checkpointed,
222{
223 pub(crate) fn validate_cursor_consistency(&self) -> Result<Option<u64>, Error> {
228 match (self.after(), self.before()) {
229 (Some(after), Some(before)) => {
230 if after.checkpoint_viewed_at() == before.checkpoint_viewed_at() {
231 Ok(Some(after.checkpoint_viewed_at()))
232 } else {
233 Err(Error::Client(
234 "The provided cursors are taken from different checkpoints and cannot be used together in the same query."
235 .to_string(),
236 ))
237 }
238 }
239 (Some(cursor), None) | (None, Some(cursor)) => Ok(Some(cursor.checkpoint_viewed_at())),
242 (None, None) => Ok(None),
243 }
244 }
245}
246
247impl Page<JsonCursor<ConsistentIndexCursor>> {
248 #[allow(clippy::type_complexity)]
254 pub(crate) fn paginate_consistent_indices(
255 &self,
256 total: usize,
257 checkpoint_viewed_at: u64,
258 ) -> Result<
259 Option<(
260 bool,
261 bool,
262 u64,
263 impl Iterator<Item = JsonCursor<ConsistentIndexCursor>>,
264 )>,
265 Error,
266 > {
267 let cursor_viewed_at = self.validate_cursor_consistency()?;
268 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
269
270 let mut lo = self.after().map_or(0, |a| a.ix + 1);
271 let mut hi = self.before().map_or(total, |b| b.ix);
272
273 if hi <= lo {
274 return Ok(None);
275 } else if (hi - lo) > self.limit() {
276 if self.is_from_front() {
277 hi = lo + self.limit();
278 } else {
279 lo = hi - self.limit();
280 }
281 }
282
283 Ok(Some((
284 0 < lo,
285 hi < total,
286 checkpoint_viewed_at,
287 (lo..hi).map(move |ix| {
288 JsonCursor::new(ConsistentIndexCursor {
289 ix,
290 c: checkpoint_viewed_at,
291 })
292 }),
293 )))
294 }
295}
296
297impl<C: CursorType + ScanLimited + Eq + Clone + Send + Sync + 'static> Page<C> {
298 pub(crate) fn paginate_query<T, Q, ST, GB>(
309 &self,
310 conn: &mut Conn<'_>,
311 checkpoint_viewed_at: u64,
312 query: Q,
313 ) -> QueryResult<(bool, bool, impl Iterator<Item = T>)>
314 where
315 Q: Fn() -> Query<ST, T::Source, GB>,
316 Query<ST, T::Source, GB>: LoadQuery<'static, DieselConn, T>,
317 Query<ST, T::Source, GB>: QueryFragment<DieselBackend>,
318 <T as Paginated<C>>::Source: Send + 'static,
319 <<T as Paginated<C>>::Source as QuerySource>::FromClause: Send + 'static,
320 Q: Send + 'static,
321 T: Send + Paginated<C> + 'static,
322 ST: Send + 'static,
323 GB: Send + 'static,
324 {
325 let page = self.clone();
326 let query = move || {
327 let mut query = query();
328 if let Some(after) = page.after() {
329 query = T::filter_ge(after, query);
330 }
331
332 if let Some(before) = page.before() {
333 query = T::filter_le(before, query);
334 }
335
336 query = query.limit(page.limit() as i64 + 2);
338 T::order(page.is_from_front(), query)
339 };
340
341 let results: Vec<T> = if self.limit() == 0 {
342 vec![]
344 } else {
345 let mut results = conn.results(query)?;
346 if !self.is_from_front() {
347 results.reverse();
348 }
349 results
350 };
351
352 Ok(self.paginate_results(
353 results.first().map(|f| f.cursor(checkpoint_viewed_at)),
354 results.last().map(|l| l.cursor(checkpoint_viewed_at)),
355 results,
356 ))
357 }
358
359 pub(crate) fn paginate_raw_query<T>(
369 &self,
370 conn: &mut Conn<'_>,
371 checkpoint_viewed_at: u64,
372 query: RawQuery,
373 ) -> QueryResult<(bool, bool, impl Iterator<Item = T>)>
374 where
375 T: Send + RawPaginated<C> + FromSqlRow<Untyped, DieselBackend> + 'static,
376 {
377 let new_query = move || {
378 let query = self.apply::<T>(query.clone());
379 query.into_boxed()
380 };
381
382 let results: Vec<T> = if self.limit() == 0 {
383 vec![]
385 } else {
386 let mut results: Vec<T> = conn.results(new_query)?;
387 if !self.is_from_front() {
388 results.reverse();
389 }
390 results
391 };
392
393 Ok(self.paginate_results(
394 results.first().map(|f| f.cursor(checkpoint_viewed_at)),
395 results.last().map(|l| l.cursor(checkpoint_viewed_at)),
396 results,
397 ))
398 }
399
400 fn paginate_results<T>(
411 &self,
412 f_cursor: Option<C>,
413 l_cursor: Option<C>,
414 results: Vec<T>,
415 ) -> (bool, bool, impl Iterator<Item = T>)
416 where
417 T: Target<C> + Send + 'static,
418 {
419 let (prev, next, prefix, suffix) =
421 match (self.after(), f_cursor, l_cursor, self.before(), self.end) {
422 (_, None, _, _, _) | (_, _, None, _, _) => {
426 return (false, false, vec![].into_iter());
427 }
428
429 (Some(a), Some(f), _, _, End::Front) if f != *a && !a.is_scan_limited() => {
434 return (false, false, vec![].into_iter());
435 }
436
437 (_, _, Some(l), Some(b), End::Back) if l != *b && !b.is_scan_limited() => {
439 return (false, false, vec![].into_iter());
440 }
441
442 (after, Some(f), Some(l), before, End::Front) => {
449 let has_previous_page = after.is_some_and(|a| a.unlimited() == f);
450 let prefix = has_previous_page as usize;
451
452 let mut suffix = before.is_some_and(|b| b.unlimited() == l) as usize;
456 suffix += results.len().saturating_sub(self.limit() + prefix + suffix);
457 let has_next_page = suffix > 0;
458
459 (has_previous_page, has_next_page, prefix, suffix)
460 }
461
462 (after, Some(f), Some(l), before, End::Back) => {
464 let has_next_page = before.is_some_and(|b| b.unlimited() == l);
467 let suffix = has_next_page as usize;
468
469 let mut prefix = after.is_some_and(|a| a.unlimited() == f) as usize;
470 prefix += results.len().saturating_sub(self.limit() + prefix + suffix);
471 let has_previous_page = prefix > 0;
472
473 (has_previous_page, has_next_page, prefix, suffix)
474 }
475 };
476
477 if results.len() == prefix + suffix {
481 return (false, false, vec![].into_iter());
482 }
483
484 let mut results = results.into_iter();
487 if prefix > 0 {
488 results.nth(prefix - 1);
489 }
490 if suffix > 0 {
491 results.nth_back(suffix - 1);
492 }
493
494 (prev, next, results)
495 }
496
497 pub(crate) fn apply<T>(&self, mut query: RawQuery) -> RawQuery
498 where
499 T: RawPaginated<C>,
500 {
501 if let Some(after) = self.after() {
502 query = T::filter_ge(after, query);
503 }
504
505 if let Some(before) = self.before() {
506 query = T::filter_le(before, query);
507 }
508
509 query = T::order(self.is_from_front(), query);
510
511 query.limit(self.limit() as i64 + 2)
512 }
513}
514
515#[Scalar(name = "String", visible = false)]
516impl<C> ScalarType for JsonCursor<C>
517where
518 C: Send + Sync,
519 C: Serialize + DeserializeOwned,
520{
521 fn parse(value: Value) -> InputValueResult<Self> {
522 let Value::String(s) = value else {
523 return Err(InputValueError::expected_type(value));
524 };
525
526 Ok(JsonCursor(OpaqueCursor::decode_cursor(&s)?))
527 }
528
529 fn is_valid(value: &Value) -> bool {
532 matches!(value, Value::String(_))
533 }
534
535 fn to_value(&self) -> Value {
536 Value::String(self.0.encode_cursor())
537 }
538}
539
540#[Scalar(name = "String", visible = false)]
541impl<C> ScalarType for BcsCursor<C>
542where
543 C: Send + Sync,
544 C: Serialize + DeserializeOwned,
545{
546 fn parse(value: Value) -> InputValueResult<Self> {
547 let Value::String(s) = value else {
548 return Err(InputValueError::expected_type(value));
549 };
550
551 Ok(Self::decode_cursor(&s)?)
552 }
553
554 fn is_valid(value: &Value) -> bool {
557 matches!(value, Value::String(_))
558 }
559
560 fn to_value(&self) -> Value {
561 Value::String(self.encode_cursor())
562 }
563}
564
565impl<C> CursorType for JsonCursor<C>
568where
569 C: Send + Sync,
570 C: Serialize + DeserializeOwned,
571{
572 type Error = <OpaqueCursor<C> as CursorType>::Error;
573
574 fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
575 Ok(JsonCursor(OpaqueCursor::decode_cursor(s)?))
576 }
577
578 fn encode_cursor(&self) -> String {
579 self.0.encode_cursor()
580 }
581}
582
583impl<C> CursorType for BcsCursor<C>
584where
585 C: Send + Sync,
586 C: Serialize + DeserializeOwned,
587{
588 type Error = <OpaqueCursor<C> as CursorType>::Error;
589
590 fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
591 let data = Base64::decode(s)?;
592 Ok(Self(bcs::from_bytes(&data)?))
593 }
594
595 fn encode_cursor(&self) -> String {
596 let value = bcs::to_bytes(&self.0).unwrap_or_default();
597 Base64::encode(value)
598 }
599}
600
601impl<C> Deref for JsonCursor<C> {
602 type Target = C;
603
604 fn deref(&self) -> &Self::Target {
605 self.0.deref()
606 }
607}
608
609impl<C> Deref for BcsCursor<C> {
610 type Target = C;
611
612 fn deref(&self) -> &Self::Target {
613 &self.0
614 }
615}
616
617impl<C: fmt::Debug> fmt::Debug for JsonCursor<C> {
618 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
619 write!(f, "{:?}", *self.0)
620 }
621}
622
623impl<C: fmt::Debug> fmt::Debug for BcsCursor<C> {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 write!(f, "{:?}", self.0)
626 }
627}
628
629impl<C: Clone> Clone for JsonCursor<C> {
630 fn clone(&self) -> Self {
631 JsonCursor::new(self.0.0.clone())
632 }
633}
634
635impl<C: Clone> Clone for BcsCursor<C> {
636 fn clone(&self) -> Self {
637 BcsCursor::new(self.0.clone())
638 }
639}
640
641impl<C: PartialEq> PartialEq for JsonCursor<C> {
642 fn eq(&self, other: &Self) -> bool {
643 self.deref() == other.deref()
644 }
645}
646
647impl<C: PartialEq> PartialEq for BcsCursor<C> {
648 fn eq(&self, other: &Self) -> bool {
649 self.0 == other.0
650 }
651}
652
653impl<C: Eq> Eq for JsonCursor<C> {}
654impl<C: Eq> Eq for BcsCursor<C> {}
655
656#[cfg(test)]
657mod tests {
658 use expect_test::expect;
659
660 use super::*;
661
662 #[test]
663 fn test_default_page() {
664 let config = ServiceConfig::default();
665 let page: Page<JsonCursor<u64>> =
666 Page::from_params(&config, None, None, None, None).unwrap();
667
668 let expect = expect![[r#"
669 Page {
670 after: None,
671 before: None,
672 limit: 20,
673 end: Front,
674 }"#]];
675 expect.assert_eq(&format!("{page:#?}"));
676 }
677
678 #[test]
679 fn test_prefix_page() {
680 let config = ServiceConfig::default();
681 let page: Page<JsonCursor<u64>> =
682 Page::from_params(&config, None, Some(JsonCursor::new(42)), None, None).unwrap();
683
684 let expect = expect![[r#"
685 Page {
686 after: Some(
687 42,
688 ),
689 before: None,
690 limit: 20,
691 end: Front,
692 }"#]];
693 expect.assert_eq(&format!("{page:#?}"));
694 }
695
696 #[test]
697 fn test_prefix_page_limited() {
698 let config = ServiceConfig::default();
699 let page: Page<JsonCursor<u64>> =
700 Page::from_params(&config, Some(10), Some(JsonCursor::new(42)), None, None).unwrap();
701
702 let expect = expect![[r#"
703 Page {
704 after: Some(
705 42,
706 ),
707 before: None,
708 limit: 10,
709 end: Front,
710 }"#]];
711 expect.assert_eq(&format!("{page:#?}"));
712 }
713
714 #[test]
715 fn test_suffix_page() {
716 let config = ServiceConfig::default();
717 let page: Page<JsonCursor<u64>> =
718 Page::from_params(&config, None, None, None, Some(JsonCursor::new(42))).unwrap();
719
720 let expect = expect![[r#"
721 Page {
722 after: None,
723 before: Some(
724 42,
725 ),
726 limit: 20,
727 end: Front,
728 }"#]];
729 expect.assert_eq(&format!("{page:#?}"));
730 }
731
732 #[test]
733 fn test_suffix_page_limited() {
734 let config = ServiceConfig::default();
735 let page: Page<JsonCursor<u64>> =
736 Page::from_params(&config, None, None, Some(10), Some(JsonCursor::new(42))).unwrap();
737
738 let expect = expect![[r#"
739 Page {
740 after: None,
741 before: Some(
742 42,
743 ),
744 limit: 10,
745 end: Back,
746 }"#]];
747 expect.assert_eq(&format!("{page:#?}"));
748 }
749
750 #[test]
751 fn test_between_page_prefix() {
752 let config = ServiceConfig::default();
753 let page: Page<JsonCursor<u64>> = Page::from_params(
754 &config,
755 Some(10),
756 Some(JsonCursor::new(40)),
757 None,
758 Some(JsonCursor::new(42)),
759 )
760 .unwrap();
761
762 let expect = expect![[r#"
763 Page {
764 after: Some(
765 40,
766 ),
767 before: Some(
768 42,
769 ),
770 limit: 10,
771 end: Front,
772 }"#]];
773 expect.assert_eq(&format!("{page:#?}"));
774 }
775
776 #[test]
777 fn test_between_page_suffix() {
778 let config = ServiceConfig::default();
779 let page: Page<JsonCursor<u64>> = Page::from_params(
780 &config,
781 None,
782 Some(JsonCursor::new(40)),
783 Some(10),
784 Some(JsonCursor::new(42)),
785 )
786 .unwrap();
787
788 let expect = expect![[r#"
789 Page {
790 after: Some(
791 40,
792 ),
793 before: Some(
794 42,
795 ),
796 limit: 10,
797 end: Back,
798 }"#]];
799 expect.assert_eq(&format!("{page:#?}"));
800 }
801
802 #[test]
803 fn test_between_page() {
804 let config = ServiceConfig::default();
805 let page: Page<JsonCursor<u64>> = Page::from_params(
806 &config,
807 None,
808 Some(JsonCursor::new(40)),
809 None,
810 Some(JsonCursor::new(42)),
811 )
812 .unwrap();
813
814 let expect = expect![[r#"
815 Page {
816 after: Some(
817 40,
818 ),
819 before: Some(
820 42,
821 ),
822 limit: 20,
823 end: Front,
824 }"#]];
825 expect.assert_eq(&format!("{page:#?}"));
826 }
827
828 #[test]
829 fn test_err_first_and_last() {
830 let config = ServiceConfig::default();
831 let err = Page::<JsonCursor<u64>>::from_params(&config, Some(1), None, Some(1), None)
832 .unwrap_err();
833
834 let expect = expect![[r#"
835 Error {
836 message: "'first' and 'last' must not be used together",
837 extensions: Some(
838 ErrorExtensionValues(
839 {
840 "code": String(
841 "BAD_USER_INPUT",
842 ),
843 },
844 ),
845 ),
846 }"#]];
847 expect.assert_eq(&format!("{err:#?}"));
848 }
849
850 #[test]
851 fn test_err_page_too_big() {
852 let config = ServiceConfig::default();
853 let too_big = config.limits.max_page_size as u64 + 1;
854 let err = Page::<JsonCursor<u64>>::from_params(&config, Some(too_big), None, None, None)
855 .unwrap_err();
856
857 let expect = expect![[r#"
858 Error {
859 message: "Connection's page size of 51 exceeds max of 50",
860 extensions: Some(
861 ErrorExtensionValues(
862 {
863 "code": String(
864 "BAD_USER_INPUT",
865 ),
866 },
867 ),
868 ),
869 }"#]];
870 expect.assert_eq(&format!("{err:#?}"));
871 }
872}