iota_rest_api/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_sdk_types::{
8    CheckpointSequenceNumber, EpochId, Object, ObjectId, SignedTransaction, ValidatorCommittee,
9    Version,
10};
11use iota_types::storage::{
12    ObjectStore, RestStateReader,
13    error::{Error as StorageError, Result},
14};
15use tap::Pipe;
16
17use crate::Direction;
18
19#[derive(Clone)]
20pub struct StateReader {
21    inner: Arc<dyn RestStateReader>,
22}
23
24impl StateReader {
25    pub fn new(inner: Arc<dyn RestStateReader>) -> Self {
26        Self { inner }
27    }
28
29    pub fn inner(&self) -> &Arc<dyn RestStateReader> {
30        &self.inner
31    }
32
33    pub fn get_object(&self, object_id: ObjectId) -> crate::Result<Option<Object>> {
34        self.inner
35            .try_get_object(&object_id.into())
36            .map_err(Into::into)
37            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
38    }
39
40    pub fn get_object_with_version(
41        &self,
42        object_id: ObjectId,
43        version: Version,
44    ) -> crate::Result<Option<Object>> {
45        self.inner
46            .try_get_object_by_key(&object_id.into(), version.into())
47            .map_err(Into::into)
48            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
49    }
50
51    pub fn try_get_committee(&self, epoch: EpochId) -> Result<Option<ValidatorCommittee>> {
52        self.inner
53            .try_get_committee(epoch)
54            .map(|maybe| maybe.map(|committee| (*committee).clone().into()))
55    }
56
57    pub fn get_system_state_summary(&self) -> Result<super::system::SystemStateSummary> {
58        use iota_types::iota_system_state::{
59            IotaSystemStateTrait, iota_system_state_summary::IotaSystemStateSummaryV2,
60        };
61
62        let system_state = iota_types::iota_system_state::get_iota_system_state(self.inner())
63            .map_err(StorageError::custom)?;
64        let summary =
65            IotaSystemStateSummaryV2::try_from(system_state.into_iota_system_state_summary())
66                .map_err(StorageError::custom)?;
67
68        Ok(summary.into())
69    }
70
71    pub fn get_transaction(
72        &self,
73        digest: iota_sdk_types::Digest,
74    ) -> crate::Result<(
75        iota_sdk_types::SignedTransaction,
76        iota_sdk_types::TransactionEffects,
77        Option<iota_sdk_types::TransactionEvents>,
78    )> {
79        use iota_types::effects::TransactionEffectsAPI;
80
81        use super::transactions::TransactionNotFoundError;
82
83        let transaction_digest = digest.into();
84
85        let transaction = (*self
86            .inner()
87            .try_get_transaction(&transaction_digest)?
88            .ok_or(TransactionNotFoundError(digest))?)
89        .clone()
90        .into_inner();
91        let effects = self
92            .inner()
93            .try_get_transaction_effects(&transaction_digest)?
94            .ok_or(TransactionNotFoundError(digest))?;
95        let events = if let Some(event_digest) = effects.events_digest() {
96            self.inner()
97                .try_get_events(event_digest)?
98                .ok_or(TransactionNotFoundError(digest))?
99                .pipe(Some)
100        } else {
101            None
102        };
103
104        Ok((
105            transaction.try_into()?,
106            effects.try_into()?,
107            events.map(TryInto::try_into).transpose()?,
108        ))
109    }
110
111    pub fn get_transaction_checkpoint(
112        &self,
113        digest: &iota_types::digests::TransactionDigest,
114    ) -> Option<CheckpointSequenceNumber> {
115        self.inner()
116            .indexes()?
117            .get_transaction_info(digest)
118            .ok()?
119            .map(|info| info.checkpoint)
120    }
121
122    pub fn get_transaction_response(
123        &self,
124        digest: iota_sdk_types::Digest,
125    ) -> crate::Result<super::transactions::TransactionResponse> {
126        let (
127            SignedTransaction {
128                transaction,
129                signatures,
130            },
131            effects,
132            events,
133        ) = self.get_transaction(digest)?;
134
135        let checkpoint = self.get_transaction_checkpoint(&(digest.into()));
136        let timestamp_ms = if let Some(checkpoint) = checkpoint {
137            self.inner()
138                .try_get_checkpoint_by_sequence_number(checkpoint)?
139                .map(|checkpoint| checkpoint.timestamp_ms)
140        } else {
141            None
142        };
143
144        Ok(crate::transactions::TransactionResponse {
145            digest: transaction.digest(),
146            transaction,
147            signatures,
148            effects,
149            events,
150            checkpoint,
151            timestamp_ms,
152        })
153    }
154
155    pub fn checkpoint_iter(
156        &self,
157        direction: Direction,
158        start: CheckpointSequenceNumber,
159    ) -> CheckpointIter {
160        CheckpointIter::new(self.clone(), direction, start)
161    }
162
163    pub fn transaction_iter(
164        &self,
165        direction: Direction,
166        cursor: (CheckpointSequenceNumber, Option<usize>),
167    ) -> CheckpointTransactionsIter {
168        CheckpointTransactionsIter::new(self.clone(), direction, cursor)
169    }
170}
171
172pub struct CheckpointTransactionsIter {
173    reader: StateReader,
174    direction: Direction,
175
176    next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
177    checkpoint: Option<(
178        iota_types::messages_checkpoint::CheckpointSummary,
179        iota_types::messages_checkpoint::CheckpointContents,
180    )>,
181}
182
183impl CheckpointTransactionsIter {
184    pub fn new(
185        reader: StateReader,
186        direction: Direction,
187        start: (CheckpointSequenceNumber, Option<usize>),
188    ) -> Self {
189        Self {
190            reader,
191            direction,
192            next_cursor: Some(start),
193            checkpoint: None,
194        }
195    }
196}
197
198impl Iterator for CheckpointTransactionsIter {
199    type Item = Result<(CursorInfo, iota_types::digests::TransactionDigest)>;
200
201    fn next(&mut self) -> Option<Self::Item> {
202        loop {
203            let (current_checkpoint, transaction_index) = self.next_cursor?;
204
205            let (checkpoint, contents) = if let Some(checkpoint) = &self.checkpoint {
206                if checkpoint.0.sequence_number != current_checkpoint {
207                    self.checkpoint = None;
208                    continue;
209                } else {
210                    checkpoint
211                }
212            } else {
213                let checkpoint = match self
214                    .reader
215                    .inner()
216                    .try_get_checkpoint_by_sequence_number(current_checkpoint)
217                {
218                    Ok(Some(checkpoint)) => checkpoint,
219                    Ok(None) => return None,
220                    Err(e) => return Some(Err(e)),
221                };
222                let contents = match self
223                    .reader
224                    .inner()
225                    .try_get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
226                {
227                    Ok(Some(contents)) => contents,
228                    Ok(None) => return None,
229                    Err(e) => return Some(Err(e)),
230                };
231
232                self.checkpoint = Some((checkpoint.into_inner().into_data(), contents));
233                self.checkpoint.as_ref().unwrap()
234            };
235
236            let index = transaction_index
237                .map(|idx| idx.clamp(0, contents.size().saturating_sub(1)))
238                .unwrap_or_else(|| match self.direction {
239                    Direction::Ascending => 0,
240                    Direction::Descending => contents.size().saturating_sub(1),
241                });
242
243            self.next_cursor = {
244                let next_index = match self.direction {
245                    Direction::Ascending => {
246                        let next_index = index + 1;
247                        if next_index >= contents.size() {
248                            None
249                        } else {
250                            Some(next_index)
251                        }
252                    }
253                    Direction::Descending => index.checked_sub(1),
254                };
255
256                let next_checkpoint = if next_index.is_some() {
257                    Some(current_checkpoint)
258                } else {
259                    match self.direction {
260                        Direction::Ascending => current_checkpoint.checked_add(1),
261                        Direction::Descending => current_checkpoint.checked_sub(1),
262                    }
263                };
264
265                next_checkpoint.map(|checkpoint| (checkpoint, next_index))
266            };
267
268            if contents.size() == 0 {
269                continue;
270            }
271
272            let digest = contents.inner()[index].transaction;
273
274            let cursor_info = CursorInfo {
275                checkpoint: checkpoint.sequence_number,
276                timestamp_ms: checkpoint.timestamp_ms,
277                index: index as u64,
278                next_cursor: self.next_cursor,
279            };
280
281            return Some(Ok((cursor_info, digest)));
282        }
283    }
284}
285
286pub struct CursorInfo {
287    pub checkpoint: CheckpointSequenceNumber,
288    pub timestamp_ms: u64,
289    #[expect(unused)]
290    pub index: u64,
291
292    // None if there are no more transactions in the store
293    pub next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
294}
295
296pub struct CheckpointIter {
297    reader: StateReader,
298    direction: Direction,
299
300    next_cursor: Option<CheckpointSequenceNumber>,
301}
302
303impl CheckpointIter {
304    pub fn new(reader: StateReader, direction: Direction, start: CheckpointSequenceNumber) -> Self {
305        Self {
306            reader,
307            direction,
308            next_cursor: Some(start),
309        }
310    }
311}
312
313impl Iterator for CheckpointIter {
314    type Item = Result<(
315        iota_types::messages_checkpoint::CertifiedCheckpointSummary,
316        iota_types::messages_checkpoint::CheckpointContents,
317    )>;
318
319    fn next(&mut self) -> Option<Self::Item> {
320        let current_checkpoint = self.next_cursor?;
321
322        let checkpoint = match self
323            .reader
324            .inner()
325            .try_get_checkpoint_by_sequence_number(current_checkpoint)
326        {
327            Ok(Some(checkpoint)) => checkpoint,
328            Ok(None) => return None,
329            Err(e) => return Some(Err(e)),
330        }
331        .into_inner();
332        let contents = match self
333            .reader
334            .inner()
335            .try_get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
336        {
337            Ok(Some(contents)) => contents,
338            Ok(None) => return None,
339            Err(e) => return Some(Err(e)),
340        };
341
342        self.next_cursor = match self.direction {
343            Direction::Ascending => current_checkpoint.checked_add(1),
344            Direction::Descending => current_checkpoint.checked_sub(1),
345        };
346
347        Some(Ok((checkpoint, contents)))
348    }
349}