iota_indexer/
system_package_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
8use iota_types::SYSTEM_PACKAGE_ADDRESSES;
9use tokio_util::sync::CancellationToken;
10
11use crate::{indexer_reader::IndexerReader, schema::epochs, store::diesel_macro::*};
12
13/// Background task responsible for evicting system packages from the package
14/// resolver's cache after detecting an epoch boundary.
15pub(crate) struct SystemPackageTask {
16    /// Holds the DB connection and also the package resolver to evict packages
17    /// from.
18    reader: IndexerReader,
19    /// Signal to cancel the task.
20    cancel: CancellationToken,
21    /// Interval to sleep for between checks.
22    interval: Duration,
23}
24
25impl SystemPackageTask {
26    pub(crate) fn new(
27        reader: IndexerReader,
28        cancel: CancellationToken,
29        interval: Duration,
30    ) -> Self {
31        Self {
32            reader,
33            cancel,
34            interval,
35        }
36    }
37
38    pub(crate) async fn run(&self) {
39        let mut last_epoch: i64 = 0;
40        loop {
41            tokio::select! {
42                _ = self.cancel.cancelled() => {
43                    tracing::info!(
44                        "Shutdown signal received, terminating system package eviction task"
45                    );
46                    return;
47                }
48                _ = tokio::time::sleep(self.interval) => {
49                    let pool = self.reader.get_pool();
50                    let next_epoch = match run_query_async!(&pool, move |conn| {
51                            epochs::dsl::epochs
52                                .select(epochs::dsl::epoch)
53                                .order_by(epochs::epoch.desc())
54                                .first::<i64>(conn)
55                        }) {
56                        Ok(epoch) => epoch,
57                        Err(e) => {
58                            tracing::error!("Failed to fetch latest epoch: {:?}", e);
59                            continue;
60                        }
61                    };
62
63                    if next_epoch > last_epoch {
64                        last_epoch = next_epoch;
65                        tracing::info!(
66                            "Detected epoch boundary, evicting system packages from cache"
67                        );
68                        self.reader
69                            .package_resolver()
70                            .package_store()
71                            .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
72                    }
73                }
74            }
75        }
76    }
77}