iota_indexer/
system_package_task.rs1use std::time::Duration;
6
7use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
8use iota_types::SYSTEM_PACKAGE_ADDRESSES;
9use tokio_util::sync::CancellationToken;
10
11use crate::{indexer_reader::IndexerReader, schema::epochs, store::diesel_macro::*};
12
13pub(crate) struct SystemPackageTask {
16 reader: IndexerReader,
19 cancel: CancellationToken,
21 interval: Duration,
23}
24
25impl SystemPackageTask {
26 pub(crate) fn new(
27 reader: IndexerReader,
28 cancel: CancellationToken,
29 interval: Duration,
30 ) -> Self {
31 Self {
32 reader,
33 cancel,
34 interval,
35 }
36 }
37
38 pub(crate) async fn run(&self) {
39 let mut last_epoch: i64 = 0;
40 loop {
41 tokio::select! {
42 _ = self.cancel.cancelled() => {
43 tracing::info!(
44 "Shutdown signal received, terminating system package eviction task"
45 );
46 return;
47 }
48 _ = tokio::time::sleep(self.interval) => {
49 let pool = self.reader.get_pool();
50 let next_epoch = match run_query_async!(&pool, move |conn| {
51 epochs::dsl::epochs
52 .select(epochs::dsl::epoch)
53 .order_by(epochs::epoch.desc())
54 .first::<i64>(conn)
55 }) {
56 Ok(epoch) => epoch,
57 Err(e) => {
58 tracing::error!("Failed to fetch latest epoch: {:?}", e);
59 continue;
60 }
61 };
62
63 if next_epoch > last_epoch {
64 last_epoch = next_epoch;
65 tracing::info!(
66 "Detected epoch boundary, evicting system packages from cache"
67 );
68 self.reader
69 .package_resolver()
70 .package_store()
71 .evict(SYSTEM_PACKAGE_ADDRESSES.iter().copied());
72 }
73 }
74 }
75 }
76 }
77}