1use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use futures::Stream;
8use iota_json_rpc_types::Filter;
9use iota_metrics::{metered_channel::Sender, spawn_monitored_task};
10use iota_types::{base_types::ObjectID, error::IotaError};
11use parking_lot::RwLock;
12use prometheus::Registry;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, warn};
16
17use crate::subscription_handler::{EVENT_DISPATCH_BUFFER_SIZE, SubscriptionMetrics};
18
19type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (tokio::sync::mpsc::Sender<T>, F)>>>;
20
21pub struct Streamer<T, S, F: Filter<T>> {
25 streamer_queue: Sender<T>,
26 subscribers: Subscribers<S, F>,
27}
28
29impl<T, S, F> Streamer<T, S, F>
30where
31 S: From<T> + Clone + Debug + Send + Sync + 'static,
32 T: Clone + Send + Sync + 'static,
33 F: Filter<T> + Clone + Send + Sync + 'static + Clone,
34{
35 pub fn spawn(
36 buffer: usize,
37 metrics: Arc<SubscriptionMetrics>,
38 metrics_label: &'static str,
39 ) -> Self {
40 let channel_label = format!("streamer_{}", metrics_label);
41 let gauge = if let Some(metrics) = iota_metrics::get_metrics() {
42 metrics
43 .channel_inflight
44 .with_label_values(&[&channel_label])
45 } else {
46 iota_metrics::init_metrics(&Registry::default());
49 iota_metrics::get_metrics()
50 .unwrap()
51 .channel_inflight
52 .with_label_values(&[&channel_label])
53 };
54
55 let (tx, rx) = iota_metrics::metered_channel::channel(buffer, &gauge);
56 let streamer = Self {
57 streamer_queue: tx,
58 subscribers: Default::default(),
59 };
60 let mut rx = rx;
61 let subscribers = streamer.subscribers.clone();
62 spawn_monitored_task!(async move {
63 while let Some(data) = rx.recv().await {
64 Self::send_to_all_subscribers(
65 subscribers.clone(),
66 data,
67 metrics.clone(),
68 metrics_label,
69 )
70 .await;
71 }
72 });
73 streamer
74 }
75
76 async fn send_to_all_subscribers(
77 subscribers: Subscribers<S, F>,
78 data: T,
79 metrics: Arc<SubscriptionMetrics>,
80 metrics_label: &'static str,
81 ) {
82 let success_counter = metrics
83 .streaming_success
84 .with_label_values(&[metrics_label]);
85 let failure_counter = metrics
86 .streaming_failure
87 .with_label_values(&[metrics_label]);
88 let subscriber_count = metrics
89 .streaming_active_subscriber_number
90 .with_label_values(&[metrics_label]);
91
92 let to_remove = {
93 let mut to_remove = vec![];
94 let subscribers_snapshot = subscribers.read();
95 subscriber_count.set(subscribers_snapshot.len() as i64);
96
97 for (id, (subscriber, filter)) in subscribers_snapshot.iter() {
98 if !(filter.matches(&data)) {
99 continue;
100 }
101 let data = data.clone();
102 match subscriber.try_send(data.into()) {
103 Ok(_) => {
104 debug!(subscription_id = id, "Streaming data to subscriber.");
105 success_counter.inc();
106 }
107 Err(e) => {
108 warn!(
109 subscription_id = id,
110 "Error when streaming data, removing subscriber. Error: {e}"
111 );
112 to_remove.push(id.clone());
117 failure_counter.inc();
118 }
119 }
120 }
121 to_remove
122 };
123 if !to_remove.is_empty() {
124 let mut subscribers = subscribers.write();
125 for sub in to_remove {
126 subscribers.remove(&sub);
127 }
128 }
129 }
130
131 pub fn subscribe(&self, filter: F) -> impl Stream<Item = S> {
133 let (tx, rx) = mpsc::channel::<S>(EVENT_DISPATCH_BUFFER_SIZE);
134 self.subscribers
135 .write()
136 .insert(ObjectID::random().to_string(), (tx, filter));
137 ReceiverStream::new(rx)
138 }
139
140 pub async fn send(&self, data: T) -> Result<(), IotaError> {
141 self.streamer_queue
142 .send(data)
143 .await
144 .map_err(|e| IotaError::FailedToDispatchSubscription {
145 error: e.to_string(),
146 })
147 }
148}