iota_core/
streamer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
6
7use futures::Stream;
8use iota_json_rpc_types::Filter;
9use iota_metrics::{metered_channel::Sender, spawn_monitored_task};
10use iota_types::{base_types::ObjectID, error::IotaError};
11use parking_lot::RwLock;
12use prometheus::Registry;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, warn};
16
17use crate::subscription_handler::{EVENT_DISPATCH_BUFFER_SIZE, SubscriptionMetrics};
18
19type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (tokio::sync::mpsc::Sender<T>, F)>>>;
20
21/// The Streamer splits a mpsc channel into multiple mpsc channels using the
22/// subscriber's `Filter<T>` object. Data will be sent to the subscribers in
23/// parallel and the subscription will be dropped if it received a send error.
24pub struct Streamer<T, S, F: Filter<T>> {
25    streamer_queue: Sender<T>,
26    subscribers: Subscribers<S, F>,
27}
28
29impl<T, S, F> Streamer<T, S, F>
30where
31    S: From<T> + Clone + Debug + Send + Sync + 'static,
32    T: Clone + Send + Sync + 'static,
33    F: Filter<T> + Clone + Send + Sync + 'static + Clone,
34{
35    pub fn spawn(
36        buffer: usize,
37        metrics: Arc<SubscriptionMetrics>,
38        metrics_label: &'static str,
39    ) -> Self {
40        let channel_label = format!("streamer_{}", metrics_label);
41        let gauge = if let Some(metrics) = iota_metrics::get_metrics() {
42            metrics
43                .channel_inflight
44                .with_label_values(&[&channel_label])
45        } else {
46            // We call init_metrics very early when starting a node. Therefore when this
47            // happens, it's probably in a test.
48            iota_metrics::init_metrics(&Registry::default());
49            iota_metrics::get_metrics()
50                .unwrap()
51                .channel_inflight
52                .with_label_values(&[&channel_label])
53        };
54
55        let (tx, rx) = iota_metrics::metered_channel::channel(buffer, &gauge);
56        let streamer = Self {
57            streamer_queue: tx,
58            subscribers: Default::default(),
59        };
60        let mut rx = rx;
61        let subscribers = streamer.subscribers.clone();
62        spawn_monitored_task!(async move {
63            while let Some(data) = rx.recv().await {
64                Self::send_to_all_subscribers(
65                    subscribers.clone(),
66                    data,
67                    metrics.clone(),
68                    metrics_label,
69                )
70                .await;
71            }
72        });
73        streamer
74    }
75
76    async fn send_to_all_subscribers(
77        subscribers: Subscribers<S, F>,
78        data: T,
79        metrics: Arc<SubscriptionMetrics>,
80        metrics_label: &'static str,
81    ) {
82        let success_counter = metrics
83            .streaming_success
84            .with_label_values(&[metrics_label]);
85        let failure_counter = metrics
86            .streaming_failure
87            .with_label_values(&[metrics_label]);
88        let subscriber_count = metrics
89            .streaming_active_subscriber_number
90            .with_label_values(&[metrics_label]);
91
92        let to_remove = {
93            let mut to_remove = vec![];
94            let subscribers_snapshot = subscribers.read();
95            subscriber_count.set(subscribers_snapshot.len() as i64);
96
97            for (id, (subscriber, filter)) in subscribers_snapshot.iter() {
98                if !(filter.matches(&data)) {
99                    continue;
100                }
101                let data = data.clone();
102                match subscriber.try_send(data.into()) {
103                    Ok(_) => {
104                        debug!(subscription_id = id, "Streaming data to subscriber.");
105                        success_counter.inc();
106                    }
107                    Err(e) => {
108                        warn!(
109                            subscription_id = id,
110                            "Error when streaming data, removing subscriber. Error: {e}"
111                        );
112                        // It does not matter what the error is - channel full or closed, we remove
113                        // the subscriber. In the case of a full channel,
114                        // this nudges the subscriber to catch up separately and not
115                        // miss any data.
116                        to_remove.push(id.clone());
117                        failure_counter.inc();
118                    }
119                }
120            }
121            to_remove
122        };
123        if !to_remove.is_empty() {
124            let mut subscribers = subscribers.write();
125            for sub in to_remove {
126                subscribers.remove(&sub);
127            }
128        }
129    }
130
131    /// Subscribe to the data stream filtered by the filter object.
132    pub fn subscribe(&self, filter: F) -> impl Stream<Item = S> {
133        let (tx, rx) = mpsc::channel::<S>(EVENT_DISPATCH_BUFFER_SIZE);
134        self.subscribers
135            .write()
136            .insert(ObjectID::random().to_string(), (tx, filter));
137        ReceiverStream::new(rx)
138    }
139
140    pub async fn send(&self, data: T) -> Result<(), IotaError> {
141        self.streamer_queue
142            .send(data)
143            .await
144            .map_err(|e| IotaError::FailedToDispatchSubscription {
145                error: e.to_string(),
146            })
147    }
148}