iota_core/
subscription_handler.rs1use std::sync::Arc;
6
7use iota_json_rpc_types::{
8 EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
9 IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
10};
11use iota_types::{error::IotaResult, transaction::TransactionData};
12use prometheus::{
13 IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
14 register_int_gauge_vec_with_registry,
15};
16use tokio_stream::Stream;
17use tracing::{error, instrument, trace};
18
19use crate::streamer::Streamer;
20
21#[cfg(test)]
22#[path = "unit_tests/subscription_handler_tests.rs"]
23mod subscription_handler_tests;
24
25pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
26
27pub struct SubscriptionMetrics {
28 pub streaming_success: IntCounterVec,
29 pub streaming_failure: IntCounterVec,
30 pub streaming_active_subscriber_number: IntGaugeVec,
31 pub dropped_submissions: IntCounterVec,
32}
33
34impl SubscriptionMetrics {
35 pub fn new(registry: &Registry) -> Self {
36 Self {
37 streaming_success: register_int_counter_vec_with_registry!(
38 "streaming_success",
39 "Total number of items that are streamed successfully",
40 &["type"],
41 registry,
42 )
43 .unwrap(),
44 streaming_failure: register_int_counter_vec_with_registry!(
45 "streaming_failure",
46 "Total number of items that fail to be streamed",
47 &["type"],
48 registry,
49 )
50 .unwrap(),
51 streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
52 "streaming_active_subscriber_number",
53 "Current number of active subscribers",
54 &["type"],
55 registry,
56 )
57 .unwrap(),
58 dropped_submissions: register_int_counter_vec_with_registry!(
59 "streaming_dropped_submissions",
60 "Total number of submissions that are dropped",
61 &["type"],
62 registry,
63 )
64 .unwrap(),
65 }
66 }
67}
68
69pub struct SubscriptionHandler {
70 event_streamer: Streamer<IotaEvent, IotaEvent, EventFilter>,
71 transaction_streamer:
72 Streamer<EffectsWithInput, IotaTransactionBlockEffects, TransactionFilter>,
73}
74
75impl SubscriptionHandler {
76 pub fn new(registry: &Registry) -> Self {
77 let metrics = Arc::new(SubscriptionMetrics::new(registry));
78 Self {
79 event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
80 transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
81 }
82 }
83}
84
85impl SubscriptionHandler {
86 #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
87 pub fn process_tx(
88 &self,
89 input: &TransactionData,
90 effects: &IotaTransactionBlockEffects,
91 events: &IotaTransactionBlockEvents,
92 ) -> IotaResult {
93 trace!(
94 num_events = events.data.len(),
95 tx_digest =? effects.transaction_digest(),
96 "Processing tx/event subscription"
97 );
98
99 if let Err(e) = self.transaction_streamer.try_send(EffectsWithInput {
100 input: input.clone(),
101 effects: effects.clone(),
102 }) {
103 error!(error =? e, "Failed to send transaction to dispatch");
104 }
105
106 for event in events.data.clone() {
108 if let Err(e) = self.event_streamer.try_send(event) {
109 error!(error =? e, "Failed to send event to dispatch");
110 }
111 }
112 Ok(())
113 }
114
115 pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = IotaEvent> {
116 self.event_streamer.subscribe(filter)
117 }
118
119 pub fn subscribe_transactions(
120 &self,
121 filter: TransactionFilter,
122 ) -> impl Stream<Item = IotaTransactionBlockEffects> {
123 self.transaction_streamer.subscribe(filter)
124 }
125}