iota_core/
subscription_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_json_rpc_types::{
8    EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
9    IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
10};
11use iota_types::{error::IotaResult, transaction::TransactionData};
12use prometheus::{
13    IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
14    register_int_gauge_vec_with_registry,
15};
16use tokio_stream::Stream;
17use tracing::{error, instrument, trace};
18
19use crate::streamer::Streamer;
20
21#[cfg(test)]
22#[path = "unit_tests/subscription_handler_tests.rs"]
23mod subscription_handler_tests;
24
25pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
26
27pub struct SubscriptionMetrics {
28    pub streaming_success: IntCounterVec,
29    pub streaming_failure: IntCounterVec,
30    pub streaming_active_subscriber_number: IntGaugeVec,
31}
32
33impl SubscriptionMetrics {
34    pub fn new(registry: &Registry) -> Self {
35        Self {
36            streaming_success: register_int_counter_vec_with_registry!(
37                "streaming_success",
38                "Total number of items that are streamed successfully",
39                &["type"],
40                registry,
41            )
42            .unwrap(),
43            streaming_failure: register_int_counter_vec_with_registry!(
44                "streaming_failure",
45                "Total number of items that fail to be streamed",
46                &["type"],
47                registry,
48            )
49            .unwrap(),
50            streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
51                "streaming_active_subscriber_number",
52                "Current number of active subscribers",
53                &["type"],
54                registry,
55            )
56            .unwrap(),
57        }
58    }
59}
60
61pub struct SubscriptionHandler {
62    event_streamer: Streamer<IotaEvent, IotaEvent, EventFilter>,
63    transaction_streamer:
64        Streamer<EffectsWithInput, IotaTransactionBlockEffects, TransactionFilter>,
65}
66
67impl SubscriptionHandler {
68    pub fn new(registry: &Registry) -> Self {
69        let metrics = Arc::new(SubscriptionMetrics::new(registry));
70        Self {
71            event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
72            transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
73        }
74    }
75}
76
77impl SubscriptionHandler {
78    #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
79    pub async fn process_tx(
80        &self,
81        input: &TransactionData,
82        effects: &IotaTransactionBlockEffects,
83        events: &IotaTransactionBlockEvents,
84    ) -> IotaResult {
85        trace!(
86            num_events = events.data.len(),
87            tx_digest =? effects.transaction_digest(),
88            "Processing tx/event subscription"
89        );
90
91        if let Err(e) = self
92            .transaction_streamer
93            .send(EffectsWithInput {
94                input: input.clone(),
95                effects: effects.clone(),
96            })
97            .await
98        {
99            error!(error =? e, "Failed to send transaction to dispatch");
100        }
101
102        // serially dispatch event processing to honor events' orders.
103        for event in events.data.clone() {
104            if let Err(e) = self.event_streamer.send(event).await {
105                error!(error =? e, "Failed to send event to dispatch");
106            }
107        }
108        Ok(())
109    }
110
111    pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = IotaEvent> {
112        self.event_streamer.subscribe(filter)
113    }
114
115    pub fn subscribe_transactions(
116        &self,
117        filter: TransactionFilter,
118    ) -> impl Stream<Item = IotaTransactionBlockEffects> {
119        self.transaction_streamer.subscribe(filter)
120    }
121}