iota_core/
subscription_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_json_rpc_types::{
8    EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
9    IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
10};
11use iota_types::{error::IotaResult, transaction::TransactionData};
12use prometheus::{
13    IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
14    register_int_gauge_vec_with_registry,
15};
16use tokio_stream::Stream;
17use tracing::{error, instrument, trace};
18
19use crate::streamer::Streamer;
20
21#[cfg(test)]
22#[path = "unit_tests/subscription_handler_tests.rs"]
23mod subscription_handler_tests;
24
25pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
26
27pub struct SubscriptionMetrics {
28    pub streaming_success: IntCounterVec,
29    pub streaming_failure: IntCounterVec,
30    pub streaming_active_subscriber_number: IntGaugeVec,
31    pub dropped_submissions: IntCounterVec,
32}
33
34impl SubscriptionMetrics {
35    pub fn new(registry: &Registry) -> Self {
36        Self {
37            streaming_success: register_int_counter_vec_with_registry!(
38                "streaming_success",
39                "Total number of items that are streamed successfully",
40                &["type"],
41                registry,
42            )
43            .unwrap(),
44            streaming_failure: register_int_counter_vec_with_registry!(
45                "streaming_failure",
46                "Total number of items that fail to be streamed",
47                &["type"],
48                registry,
49            )
50            .unwrap(),
51            streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
52                "streaming_active_subscriber_number",
53                "Current number of active subscribers",
54                &["type"],
55                registry,
56            )
57            .unwrap(),
58            dropped_submissions: register_int_counter_vec_with_registry!(
59                "streaming_dropped_submissions",
60                "Total number of submissions that are dropped",
61                &["type"],
62                registry,
63            )
64            .unwrap(),
65        }
66    }
67}
68
69pub struct SubscriptionHandler {
70    event_streamer: Streamer<IotaEvent, IotaEvent, EventFilter>,
71    transaction_streamer:
72        Streamer<EffectsWithInput, IotaTransactionBlockEffects, TransactionFilter>,
73}
74
75impl SubscriptionHandler {
76    pub fn new(registry: &Registry) -> Self {
77        let metrics = Arc::new(SubscriptionMetrics::new(registry));
78        Self {
79            event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
80            transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
81        }
82    }
83}
84
85impl SubscriptionHandler {
86    #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
87    pub fn process_tx(
88        &self,
89        input: &TransactionData,
90        effects: &IotaTransactionBlockEffects,
91        events: &IotaTransactionBlockEvents,
92    ) -> IotaResult {
93        trace!(
94            num_events = events.data.len(),
95            tx_digest =? effects.transaction_digest(),
96            "Processing tx/event subscription"
97        );
98
99        if let Err(e) = self.transaction_streamer.try_send(EffectsWithInput {
100            input: input.clone(),
101            effects: effects.clone(),
102        }) {
103            error!(error =? e, "Failed to send transaction to dispatch");
104        }
105
106        // serially dispatch event processing to honor events' orders.
107        for event in events.data.clone() {
108            if let Err(e) = self.event_streamer.try_send(event) {
109                error!(error =? e, "Failed to send event to dispatch");
110            }
111        }
112        Ok(())
113    }
114
115    pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = IotaEvent> {
116        self.event_streamer.subscribe(filter)
117    }
118
119    pub fn subscribe_transactions(
120        &self,
121        filter: TransactionFilter,
122    ) -> impl Stream<Item = IotaTransactionBlockEffects> {
123        self.transaction_streamer.subscribe(filter)
124    }
125}