iota_rest_api/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_sdk2::types::{
8    CheckpointSequenceNumber, EpochId, Object, ObjectId, SignedTransaction, ValidatorCommittee,
9    Version,
10};
11use iota_types::storage::{
12    ObjectStore, RestStateReader,
13    error::{Error as StorageError, Result},
14};
15use tap::Pipe;
16
17use crate::Direction;
18
19#[derive(Clone)]
20pub struct StateReader {
21    inner: Arc<dyn RestStateReader>,
22}
23
24impl StateReader {
25    pub fn new(inner: Arc<dyn RestStateReader>) -> Self {
26        Self { inner }
27    }
28
29    pub fn inner(&self) -> &Arc<dyn RestStateReader> {
30        &self.inner
31    }
32
33    pub fn get_object(&self, object_id: ObjectId) -> crate::Result<Option<Object>> {
34        self.inner
35            .try_get_object(&object_id.into())
36            .map_err(Into::into)
37            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
38    }
39
40    pub fn get_object_with_version(
41        &self,
42        object_id: ObjectId,
43        version: Version,
44    ) -> crate::Result<Option<Object>> {
45        self.inner
46            .try_get_object_by_key(&object_id.into(), version.into())
47            .map_err(Into::into)
48            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
49    }
50
51    pub fn try_get_committee(&self, epoch: EpochId) -> Result<Option<ValidatorCommittee>> {
52        self.inner
53            .try_get_committee(epoch)
54            .map(|maybe| maybe.map(|committee| (*committee).clone().into()))
55    }
56
57    pub fn get_system_state_summary(&self) -> Result<super::system::SystemStateSummary> {
58        use iota_types::iota_system_state::{
59            IotaSystemStateTrait, iota_system_state_summary::IotaSystemStateSummaryV2,
60        };
61
62        let system_state = iota_types::iota_system_state::get_iota_system_state(self.inner())
63            .map_err(StorageError::custom)?;
64        let summary =
65            IotaSystemStateSummaryV2::try_from(system_state.into_iota_system_state_summary())
66                .map_err(StorageError::custom)?;
67
68        Ok(summary.into())
69    }
70
71    pub fn get_transaction(
72        &self,
73        digest: iota_sdk2::types::TransactionDigest,
74    ) -> crate::Result<(
75        iota_sdk2::types::SignedTransaction,
76        iota_sdk2::types::TransactionEffects,
77        Option<iota_sdk2::types::TransactionEvents>,
78    )> {
79        use iota_types::effects::TransactionEffectsAPI;
80
81        use super::transactions::TransactionNotFoundError;
82
83        let transaction_digest = digest.into();
84
85        let transaction = (*self
86            .inner()
87            .try_get_transaction(&transaction_digest)?
88            .ok_or(TransactionNotFoundError(digest))?)
89        .clone()
90        .into_inner();
91        let effects = self
92            .inner()
93            .try_get_transaction_effects(&transaction_digest)?
94            .ok_or(TransactionNotFoundError(digest))?;
95        let events = if let Some(event_digest) = effects.events_digest() {
96            self.inner()
97                .try_get_events(event_digest)?
98                .ok_or(TransactionNotFoundError(digest))?
99                .pipe(Some)
100        } else {
101            None
102        };
103
104        Ok((
105            transaction.try_into()?,
106            effects.try_into()?,
107            events.map(TryInto::try_into).transpose()?,
108        ))
109    }
110
111    pub fn get_transaction_checkpoint(
112        &self,
113        digest: &iota_types::digests::TransactionDigest,
114    ) -> Option<CheckpointSequenceNumber> {
115        self.inner()
116            .indexes()?
117            .get_transaction_checkpoint(digest)
118            .ok()?
119    }
120
121    pub fn get_transaction_response(
122        &self,
123        digest: iota_sdk2::types::TransactionDigest,
124    ) -> crate::Result<super::transactions::TransactionResponse> {
125        let (
126            SignedTransaction {
127                transaction,
128                signatures,
129            },
130            effects,
131            events,
132        ) = self.get_transaction(digest)?;
133
134        let checkpoint = self.get_transaction_checkpoint(&(digest.into()));
135        let timestamp_ms = if let Some(checkpoint) = checkpoint {
136            self.inner()
137                .try_get_checkpoint_by_sequence_number(checkpoint)?
138                .map(|checkpoint| checkpoint.timestamp_ms)
139        } else {
140            None
141        };
142
143        Ok(crate::transactions::TransactionResponse {
144            digest: transaction.digest(),
145            transaction,
146            signatures,
147            effects,
148            events,
149            checkpoint,
150            timestamp_ms,
151        })
152    }
153
154    pub fn checkpoint_iter(
155        &self,
156        direction: Direction,
157        start: CheckpointSequenceNumber,
158    ) -> CheckpointIter {
159        CheckpointIter::new(self.clone(), direction, start)
160    }
161
162    pub fn transaction_iter(
163        &self,
164        direction: Direction,
165        cursor: (CheckpointSequenceNumber, Option<usize>),
166    ) -> CheckpointTransactionsIter {
167        CheckpointTransactionsIter::new(self.clone(), direction, cursor)
168    }
169}
170
171pub struct CheckpointTransactionsIter {
172    reader: StateReader,
173    direction: Direction,
174
175    next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
176    checkpoint: Option<(
177        iota_types::messages_checkpoint::CheckpointSummary,
178        iota_types::messages_checkpoint::CheckpointContents,
179    )>,
180}
181
182impl CheckpointTransactionsIter {
183    pub fn new(
184        reader: StateReader,
185        direction: Direction,
186        start: (CheckpointSequenceNumber, Option<usize>),
187    ) -> Self {
188        Self {
189            reader,
190            direction,
191            next_cursor: Some(start),
192            checkpoint: None,
193        }
194    }
195}
196
197impl Iterator for CheckpointTransactionsIter {
198    type Item = Result<(CursorInfo, iota_types::digests::TransactionDigest)>;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        loop {
202            let (current_checkpoint, transaction_index) = self.next_cursor?;
203
204            let (checkpoint, contents) = if let Some(checkpoint) = &self.checkpoint {
205                if checkpoint.0.sequence_number != current_checkpoint {
206                    self.checkpoint = None;
207                    continue;
208                } else {
209                    checkpoint
210                }
211            } else {
212                let checkpoint = match self
213                    .reader
214                    .inner()
215                    .try_get_checkpoint_by_sequence_number(current_checkpoint)
216                {
217                    Ok(Some(checkpoint)) => checkpoint,
218                    Ok(None) => return None,
219                    Err(e) => return Some(Err(e)),
220                };
221                let contents = match self
222                    .reader
223                    .inner()
224                    .try_get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
225                {
226                    Ok(Some(contents)) => contents,
227                    Ok(None) => return None,
228                    Err(e) => return Some(Err(e)),
229                };
230
231                self.checkpoint = Some((checkpoint.into_inner().into_data(), contents));
232                self.checkpoint.as_ref().unwrap()
233            };
234
235            let index = transaction_index
236                .map(|idx| idx.clamp(0, contents.size().saturating_sub(1)))
237                .unwrap_or_else(|| match self.direction {
238                    Direction::Ascending => 0,
239                    Direction::Descending => contents.size().saturating_sub(1),
240                });
241
242            self.next_cursor = {
243                let next_index = match self.direction {
244                    Direction::Ascending => {
245                        let next_index = index + 1;
246                        if next_index >= contents.size() {
247                            None
248                        } else {
249                            Some(next_index)
250                        }
251                    }
252                    Direction::Descending => index.checked_sub(1),
253                };
254
255                let next_checkpoint = if next_index.is_some() {
256                    Some(current_checkpoint)
257                } else {
258                    match self.direction {
259                        Direction::Ascending => current_checkpoint.checked_add(1),
260                        Direction::Descending => current_checkpoint.checked_sub(1),
261                    }
262                };
263
264                next_checkpoint.map(|checkpoint| (checkpoint, next_index))
265            };
266
267            if contents.size() == 0 {
268                continue;
269            }
270
271            let digest = contents.inner()[index].transaction;
272
273            let cursor_info = CursorInfo {
274                checkpoint: checkpoint.sequence_number,
275                timestamp_ms: checkpoint.timestamp_ms,
276                index: index as u64,
277                next_cursor: self.next_cursor,
278            };
279
280            return Some(Ok((cursor_info, digest)));
281        }
282    }
283}
284
285pub struct CursorInfo {
286    pub checkpoint: CheckpointSequenceNumber,
287    pub timestamp_ms: u64,
288    #[expect(unused)]
289    pub index: u64,
290
291    // None if there are no more transactions in the store
292    pub next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
293}
294
295pub struct CheckpointIter {
296    reader: StateReader,
297    direction: Direction,
298
299    next_cursor: Option<CheckpointSequenceNumber>,
300}
301
302impl CheckpointIter {
303    pub fn new(reader: StateReader, direction: Direction, start: CheckpointSequenceNumber) -> Self {
304        Self {
305            reader,
306            direction,
307            next_cursor: Some(start),
308        }
309    }
310}
311
312impl Iterator for CheckpointIter {
313    type Item = Result<(
314        iota_types::messages_checkpoint::CertifiedCheckpointSummary,
315        iota_types::messages_checkpoint::CheckpointContents,
316    )>;
317
318    fn next(&mut self) -> Option<Self::Item> {
319        let current_checkpoint = self.next_cursor?;
320
321        let checkpoint = match self
322            .reader
323            .inner()
324            .try_get_checkpoint_by_sequence_number(current_checkpoint)
325        {
326            Ok(Some(checkpoint)) => checkpoint,
327            Ok(None) => return None,
328            Err(e) => return Some(Err(e)),
329        }
330        .into_inner();
331        let contents = match self
332            .reader
333            .inner()
334            .try_get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
335        {
336            Ok(Some(contents)) => contents,
337            Ok(None) => return None,
338            Err(e) => return Some(Err(e)),
339        };
340
341        self.next_cursor = match self.direction {
342            Direction::Ascending => current_checkpoint.checked_add(1),
343            Direction::Descending => current_checkpoint.checked_sub(1),
344        };
345
346        Some(Ok((checkpoint, contents)))
347    }
348}