1use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use futures::Stream;
8use iota_json_rpc_types::Filter;
9use iota_metrics::{metered_channel::Sender, spawn_monitored_task};
10use iota_types::{base_types::ObjectID, error::IotaError};
11use parking_lot::RwLock;
12use prometheus::Registry;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, warn};
16
17use crate::subscription_handler::{EVENT_DISPATCH_BUFFER_SIZE, SubscriptionMetrics};
18
19type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (tokio::sync::mpsc::Sender<T>, F)>>>;
20
21pub struct Streamer<T, S, F: Filter<T>> {
25 streamer_queue: Sender<T>,
26 subscribers: Subscribers<S, F>,
27 metrics: Arc<SubscriptionMetrics>,
28 metrics_label: &'static str,
29}
30
31impl<T, S, F> Streamer<T, S, F>
32where
33 S: From<T> + Clone + Debug + Send + Sync + 'static,
34 T: Clone + Send + Sync + 'static,
35 F: Filter<T> + Clone + Send + Sync + 'static + Clone,
36{
37 pub fn spawn(
38 buffer: usize,
39 metrics: Arc<SubscriptionMetrics>,
40 metrics_label: &'static str,
41 ) -> Self {
42 let channel_label = format!("streamer_{metrics_label}");
43 let gauge = if let Some(metrics) = iota_metrics::get_metrics() {
44 metrics
45 .channel_inflight
46 .with_label_values(&[&channel_label])
47 } else {
48 iota_metrics::init_metrics(&Registry::default());
51 iota_metrics::get_metrics()
52 .unwrap()
53 .channel_inflight
54 .with_label_values(&[&channel_label])
55 };
56
57 let (tx, rx) = iota_metrics::metered_channel::channel(buffer, &gauge);
58 let streamer = Self {
59 streamer_queue: tx,
60 subscribers: Default::default(),
61 metrics: metrics.clone(),
62 metrics_label,
63 };
64 let mut rx = rx;
65 let subscribers = streamer.subscribers.clone();
66 spawn_monitored_task!(async move {
67 while let Some(data) = rx.recv().await {
68 Self::send_to_all_subscribers(
69 subscribers.clone(),
70 data,
71 metrics.clone(),
72 metrics_label,
73 )
74 .await;
75 }
76 });
77 streamer
78 }
79
80 async fn send_to_all_subscribers(
81 subscribers: Subscribers<S, F>,
82 data: T,
83 metrics: Arc<SubscriptionMetrics>,
84 metrics_label: &'static str,
85 ) {
86 let success_counter = metrics
87 .streaming_success
88 .with_label_values(&[metrics_label]);
89 let failure_counter = metrics
90 .streaming_failure
91 .with_label_values(&[metrics_label]);
92 let subscriber_count = metrics
93 .streaming_active_subscriber_number
94 .with_label_values(&[metrics_label]);
95
96 let to_remove = {
97 let mut to_remove = vec![];
98 let subscribers_snapshot = subscribers.read();
99 subscriber_count.set(subscribers_snapshot.len() as i64);
100
101 for (id, (subscriber, filter)) in subscribers_snapshot.iter() {
102 if !(filter.matches(&data)) {
103 continue;
104 }
105 let data = data.clone();
106 match subscriber.try_send(data.into()) {
107 Ok(_) => {
108 debug!(subscription_id = id, "Streaming data to subscriber.");
109 success_counter.inc();
110 }
111 Err(e) => {
112 warn!(
113 subscription_id = id,
114 "Error when streaming data, removing subscriber. Error: {e}"
115 );
116 to_remove.push(id.clone());
121 failure_counter.inc();
122 }
123 }
124 }
125 to_remove
126 };
127 if !to_remove.is_empty() {
128 let mut subscribers = subscribers.write();
129 for sub in to_remove {
130 subscribers.remove(&sub);
131 }
132 }
133 }
134
135 pub fn subscribe(&self, filter: F) -> impl Stream<Item = S> {
137 let (tx, rx) = mpsc::channel::<S>(EVENT_DISPATCH_BUFFER_SIZE);
138 self.subscribers
139 .write()
140 .insert(ObjectID::random().to_string(), (tx, filter));
141 ReceiverStream::new(rx)
142 }
143
144 pub fn try_send(&self, data: T) -> Result<(), IotaError> {
145 self.streamer_queue.try_send(data).map_err(|e| {
146 self.metrics
147 .dropped_submissions
148 .with_label_values(&[self.metrics_label])
149 .inc();
150
151 IotaError::FailedToDispatchSubscription {
152 error: e.to_string(),
153 }
154 })
155 }
156}