iota_rest_api/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_sdk2::types::{
8    CheckpointSequenceNumber, EpochId, Object, ObjectId, SignedTransaction, ValidatorCommittee,
9    Version,
10};
11use iota_types::storage::{
12    ObjectStore, RestStateReader,
13    error::{Error as StorageError, Result},
14};
15use tap::Pipe;
16
17use crate::Direction;
18
19#[derive(Clone)]
20pub struct StateReader {
21    inner: Arc<dyn RestStateReader>,
22}
23
24impl StateReader {
25    pub fn new(inner: Arc<dyn RestStateReader>) -> Self {
26        Self { inner }
27    }
28
29    pub fn inner(&self) -> &Arc<dyn RestStateReader> {
30        &self.inner
31    }
32
33    pub fn get_object(&self, object_id: ObjectId) -> crate::Result<Option<Object>> {
34        self.inner
35            .get_object(&object_id.into())
36            .map_err(Into::into)
37            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
38    }
39
40    pub fn get_object_with_version(
41        &self,
42        object_id: ObjectId,
43        version: Version,
44    ) -> crate::Result<Option<Object>> {
45        self.inner
46            .get_object_by_key(&object_id.into(), version.into())
47            .map_err(Into::into)
48            .and_then(|maybe| maybe.map(TryInto::try_into).transpose().map_err(Into::into))
49    }
50
51    pub fn get_committee(&self, epoch: EpochId) -> Result<Option<ValidatorCommittee>> {
52        self.inner
53            .get_committee(epoch)
54            .map(|maybe| maybe.map(|committee| (*committee).clone().into()))
55    }
56
57    pub fn get_system_state_summary(&self) -> Result<super::system::SystemStateSummary> {
58        use iota_types::iota_system_state::{
59            IotaSystemStateTrait, iota_system_state_summary::IotaSystemStateSummaryV2,
60        };
61
62        let system_state = iota_types::iota_system_state::get_iota_system_state(self.inner())
63            .map_err(StorageError::custom)?;
64        let summary =
65            IotaSystemStateSummaryV2::try_from(system_state.into_iota_system_state_summary())
66                .map_err(StorageError::custom)?;
67
68        Ok(summary.into())
69    }
70
71    pub fn get_transaction(
72        &self,
73        digest: iota_sdk2::types::TransactionDigest,
74    ) -> crate::Result<(
75        iota_sdk2::types::SignedTransaction,
76        iota_sdk2::types::TransactionEffects,
77        Option<iota_sdk2::types::TransactionEvents>,
78    )> {
79        use iota_types::effects::TransactionEffectsAPI;
80
81        use super::transactions::TransactionNotFoundError;
82
83        let transaction_digest = digest.into();
84
85        let transaction = (*self
86            .inner()
87            .get_transaction(&transaction_digest)?
88            .ok_or(TransactionNotFoundError(digest))?)
89        .clone()
90        .into_inner();
91        let effects = self
92            .inner()
93            .get_transaction_effects(&transaction_digest)?
94            .ok_or(TransactionNotFoundError(digest))?;
95        let events = if let Some(event_digest) = effects.events_digest() {
96            self.inner()
97                .get_events(event_digest)?
98                .ok_or(TransactionNotFoundError(digest))?
99                .pipe(Some)
100        } else {
101            None
102        };
103
104        Ok((
105            transaction.try_into()?,
106            effects.try_into()?,
107            events.map(TryInto::try_into).transpose()?,
108        ))
109    }
110
111    pub fn get_transaction_response(
112        &self,
113        digest: iota_sdk2::types::TransactionDigest,
114    ) -> crate::Result<super::transactions::TransactionResponse> {
115        let (
116            SignedTransaction {
117                transaction,
118                signatures,
119            },
120            effects,
121            events,
122        ) = self.get_transaction(digest)?;
123
124        let checkpoint = self.inner().get_transaction_checkpoint(&(digest.into()))?;
125        let timestamp_ms = if let Some(checkpoint) = checkpoint {
126            self.inner()
127                .get_checkpoint_by_sequence_number(checkpoint)?
128                .map(|checkpoint| checkpoint.timestamp_ms)
129        } else {
130            None
131        };
132
133        Ok(crate::transactions::TransactionResponse {
134            digest: transaction.digest(),
135            transaction,
136            signatures,
137            effects,
138            events,
139            checkpoint,
140            timestamp_ms,
141        })
142    }
143
144    pub fn checkpoint_iter(
145        &self,
146        direction: Direction,
147        start: CheckpointSequenceNumber,
148    ) -> CheckpointIter {
149        CheckpointIter::new(self.clone(), direction, start)
150    }
151
152    pub fn transaction_iter(
153        &self,
154        direction: Direction,
155        cursor: (CheckpointSequenceNumber, Option<usize>),
156    ) -> CheckpointTransactionsIter {
157        CheckpointTransactionsIter::new(self.clone(), direction, cursor)
158    }
159}
160
161pub struct CheckpointTransactionsIter {
162    reader: StateReader,
163    direction: Direction,
164
165    next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
166    checkpoint: Option<(
167        iota_types::messages_checkpoint::CheckpointSummary,
168        iota_types::messages_checkpoint::CheckpointContents,
169    )>,
170}
171
172impl CheckpointTransactionsIter {
173    pub fn new(
174        reader: StateReader,
175        direction: Direction,
176        start: (CheckpointSequenceNumber, Option<usize>),
177    ) -> Self {
178        Self {
179            reader,
180            direction,
181            next_cursor: Some(start),
182            checkpoint: None,
183        }
184    }
185}
186
187impl Iterator for CheckpointTransactionsIter {
188    type Item = Result<(CursorInfo, iota_types::digests::TransactionDigest)>;
189
190    fn next(&mut self) -> Option<Self::Item> {
191        loop {
192            let (current_checkpoint, transaction_index) = self.next_cursor?;
193
194            let (checkpoint, contents) = if let Some(checkpoint) = &self.checkpoint {
195                if checkpoint.0.sequence_number != current_checkpoint {
196                    self.checkpoint = None;
197                    continue;
198                } else {
199                    checkpoint
200                }
201            } else {
202                let checkpoint = match self
203                    .reader
204                    .inner()
205                    .get_checkpoint_by_sequence_number(current_checkpoint)
206                {
207                    Ok(Some(checkpoint)) => checkpoint,
208                    Ok(None) => return None,
209                    Err(e) => return Some(Err(e)),
210                };
211                let contents = match self
212                    .reader
213                    .inner()
214                    .get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
215                {
216                    Ok(Some(contents)) => contents,
217                    Ok(None) => return None,
218                    Err(e) => return Some(Err(e)),
219                };
220
221                self.checkpoint = Some((checkpoint.into_inner().into_data(), contents));
222                self.checkpoint.as_ref().unwrap()
223            };
224
225            let index = transaction_index
226                .map(|idx| idx.clamp(0, contents.size().saturating_sub(1)))
227                .unwrap_or_else(|| match self.direction {
228                    Direction::Ascending => 0,
229                    Direction::Descending => contents.size().saturating_sub(1),
230                });
231
232            self.next_cursor = {
233                let next_index = match self.direction {
234                    Direction::Ascending => {
235                        let next_index = index + 1;
236                        if next_index >= contents.size() {
237                            None
238                        } else {
239                            Some(next_index)
240                        }
241                    }
242                    Direction::Descending => index.checked_sub(1),
243                };
244
245                let next_checkpoint = if next_index.is_some() {
246                    Some(current_checkpoint)
247                } else {
248                    match self.direction {
249                        Direction::Ascending => current_checkpoint.checked_add(1),
250                        Direction::Descending => current_checkpoint.checked_sub(1),
251                    }
252                };
253
254                next_checkpoint.map(|checkpoint| (checkpoint, next_index))
255            };
256
257            if contents.size() == 0 {
258                continue;
259            }
260
261            let digest = contents.inner()[index].transaction;
262
263            let cursor_info = CursorInfo {
264                checkpoint: checkpoint.sequence_number,
265                timestamp_ms: checkpoint.timestamp_ms,
266                index: index as u64,
267                next_cursor: self.next_cursor,
268            };
269
270            return Some(Ok((cursor_info, digest)));
271        }
272    }
273}
274
275pub struct CursorInfo {
276    pub checkpoint: CheckpointSequenceNumber,
277    pub timestamp_ms: u64,
278    #[expect(unused)]
279    pub index: u64,
280
281    // None if there are no more transactions in the store
282    pub next_cursor: Option<(CheckpointSequenceNumber, Option<usize>)>,
283}
284
285pub struct CheckpointIter {
286    reader: StateReader,
287    direction: Direction,
288
289    next_cursor: Option<CheckpointSequenceNumber>,
290}
291
292impl CheckpointIter {
293    pub fn new(reader: StateReader, direction: Direction, start: CheckpointSequenceNumber) -> Self {
294        Self {
295            reader,
296            direction,
297            next_cursor: Some(start),
298        }
299    }
300}
301
302impl Iterator for CheckpointIter {
303    type Item = Result<(
304        iota_types::messages_checkpoint::CertifiedCheckpointSummary,
305        iota_types::messages_checkpoint::CheckpointContents,
306    )>;
307
308    fn next(&mut self) -> Option<Self::Item> {
309        let current_checkpoint = self.next_cursor?;
310
311        let checkpoint = match self
312            .reader
313            .inner()
314            .get_checkpoint_by_sequence_number(current_checkpoint)
315        {
316            Ok(Some(checkpoint)) => checkpoint,
317            Ok(None) => return None,
318            Err(e) => return Some(Err(e)),
319        }
320        .into_inner();
321        let contents = match self
322            .reader
323            .inner()
324            .get_checkpoint_contents_by_sequence_number(checkpoint.sequence_number)
325        {
326            Ok(Some(contents)) => contents,
327            Ok(None) => return None,
328            Err(e) => return Some(Err(e)),
329        };
330
331        self.next_cursor = match self.direction {
332            Direction::Ascending => current_checkpoint.checked_add(1),
333            Direction::Descending => current_checkpoint.checked_sub(1),
334        };
335
336        Some(Ok((checkpoint, contents)))
337    }
338}