iota_core/
subscription_handler.rs1use std::sync::Arc;
6
7use iota_json_rpc_types::{
8 EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
9 IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
10};
11use iota_types::{error::IotaResult, transaction::TransactionData};
12use prometheus::{
13 IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
14 register_int_gauge_vec_with_registry,
15};
16use tokio_stream::Stream;
17use tracing::{error, instrument, trace};
18
19use crate::streamer::Streamer;
20
21#[cfg(test)]
22#[path = "unit_tests/subscription_handler_tests.rs"]
23mod subscription_handler_tests;
24
25pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
26
27pub struct SubscriptionMetrics {
28 pub streaming_success: IntCounterVec,
29 pub streaming_failure: IntCounterVec,
30 pub streaming_active_subscriber_number: IntGaugeVec,
31}
32
33impl SubscriptionMetrics {
34 pub fn new(registry: &Registry) -> Self {
35 Self {
36 streaming_success: register_int_counter_vec_with_registry!(
37 "streaming_success",
38 "Total number of items that are streamed successfully",
39 &["type"],
40 registry,
41 )
42 .unwrap(),
43 streaming_failure: register_int_counter_vec_with_registry!(
44 "streaming_failure",
45 "Total number of items that fail to be streamed",
46 &["type"],
47 registry,
48 )
49 .unwrap(),
50 streaming_active_subscriber_number: register_int_gauge_vec_with_registry!(
51 "streaming_active_subscriber_number",
52 "Current number of active subscribers",
53 &["type"],
54 registry,
55 )
56 .unwrap(),
57 }
58 }
59}
60
61pub struct SubscriptionHandler {
62 event_streamer: Streamer<IotaEvent, IotaEvent, EventFilter>,
63 transaction_streamer:
64 Streamer<EffectsWithInput, IotaTransactionBlockEffects, TransactionFilter>,
65}
66
67impl SubscriptionHandler {
68 pub fn new(registry: &Registry) -> Self {
69 let metrics = Arc::new(SubscriptionMetrics::new(registry));
70 Self {
71 event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
72 transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
73 }
74 }
75}
76
77impl SubscriptionHandler {
78 #[instrument(level = "trace", skip_all, fields(tx_digest =? effects.transaction_digest()), err)]
79 pub async fn process_tx(
80 &self,
81 input: &TransactionData,
82 effects: &IotaTransactionBlockEffects,
83 events: &IotaTransactionBlockEvents,
84 ) -> IotaResult {
85 trace!(
86 num_events = events.data.len(),
87 tx_digest =? effects.transaction_digest(),
88 "Processing tx/event subscription"
89 );
90
91 if let Err(e) = self
92 .transaction_streamer
93 .send(EffectsWithInput {
94 input: input.clone(),
95 effects: effects.clone(),
96 })
97 .await
98 {
99 error!(error =? e, "Failed to send transaction to dispatch");
100 }
101
102 for event in events.data.clone() {
104 if let Err(e) = self.event_streamer.send(event).await {
105 error!(error =? e, "Failed to send event to dispatch");
106 }
107 }
108 Ok(())
109 }
110
111 pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = IotaEvent> {
112 self.event_streamer.subscribe(filter)
113 }
114
115 pub fn subscribe_transactions(
116 &self,
117 filter: TransactionFilter,
118 ) -> impl Stream<Item = IotaTransactionBlockEffects> {
119 self.transaction_streamer.subscribe(filter)
120 }
121}