espresso_node/
proposal_fetcher.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use async_lock::RwLock;
6use clap::Parser;
7use committable::Commitment;
8use derivative::Derivative;
9use espresso_types::{PubKey, ValidatedState, parse_duration, v0::traits::SequencerPersistence};
10use futures::stream::StreamExt;
11use hotshot::types::EventType;
12use hotshot_types::{
13    data::{Leaf2, ViewNumber},
14    traits::{
15        ValidatedState as _,
16        metrics::{Counter, Gauge, Metrics},
17        network::ConnectedNetwork,
18    },
19    utils::{View, ViewInner},
20};
21use tokio::time::{sleep, timeout};
22use tracing::Instrument;
23
24use crate::{
25    SeqTypes,
26    context::{Consensus, TaskList},
27};
28
29#[derive(Clone, Copy, Debug, Parser)]
30pub struct ProposalFetcherConfig {
31    #[clap(
32        long = "proposal-fetcher-num-workers",
33        env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
34        default_value = "2"
35    )]
36    pub num_workers: usize,
37
38    #[clap(
39        long = "proposal-fetcher-fetch-timeout",
40        env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
41        default_value = "2s",
42        value_parser = parse_duration,
43    )]
44    pub fetch_timeout: Duration,
45}
46
47impl Default for ProposalFetcherConfig {
48    fn default() -> Self {
49        Self::parse_from(std::iter::empty::<String>())
50    }
51}
52
53impl ProposalFetcherConfig {
54    pub(crate) fn spawn<N, P>(
55        self,
56        tasks: &mut TaskList,
57        consensus: Arc<RwLock<Consensus<N, P>>>,
58        persistence: Arc<P>,
59        metrics: &(impl Metrics + ?Sized),
60    ) where
61        N: ConnectedNetwork<PubKey>,
62        P: SequencerPersistence,
63    {
64        let (sender, receiver) = async_channel::unbounded();
65        let fetcher = ProposalFetcher {
66            sender,
67            consensus,
68            persistence,
69            cfg: self,
70            metrics: ProposalFetcherMetrics::new(metrics),
71        };
72
73        tasks.spawn("proposal scanner", fetcher.clone().scan());
74        for i in 0..self.num_workers {
75            tasks.spawn(
76                format!("proposal fetcher {i}"),
77                fetcher.clone().fetch(receiver.clone()),
78            );
79        }
80    }
81}
82
83#[derive(Clone, Debug)]
84struct ProposalFetcherMetrics {
85    fetched: Arc<dyn Counter>,
86    failed: Arc<dyn Counter>,
87    queue_len: Arc<dyn Gauge>,
88    last_seen: Arc<dyn Gauge>,
89    last_fetched: Arc<dyn Gauge>,
90}
91
92impl ProposalFetcherMetrics {
93    fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
94        let metrics = metrics.subgroup("proposal_fetcher".into());
95        Self {
96            fetched: metrics.create_counter("fetched".into(), None).into(),
97            failed: metrics.create_counter("failed".into(), None).into(),
98            queue_len: metrics.create_gauge("queue_len".into(), None).into(),
99            last_seen: metrics
100                .create_gauge("last_seen".into(), Some("view".into()))
101                .into(),
102            last_fetched: metrics
103                .create_gauge("last_fetched".into(), Some("view".into()))
104                .into(),
105        }
106    }
107}
108
109type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
110
111#[derive(Derivative)]
112#[derivative(Clone(bound = ""), Debug(bound = ""))]
113struct ProposalFetcher<N, P>
114where
115    N: ConnectedNetwork<PubKey>,
116    P: SequencerPersistence,
117{
118    sender: Sender<Request>,
119    #[derivative(Debug = "ignore")]
120    consensus: Arc<RwLock<Consensus<N, P>>>,
121    #[derivative(Debug = "ignore")]
122    persistence: Arc<P>,
123    cfg: ProposalFetcherConfig,
124    metrics: ProposalFetcherMetrics,
125}
126
127impl<N, P> ProposalFetcher<N, P>
128where
129    N: ConnectedNetwork<PubKey>,
130    P: SequencerPersistence,
131{
132    #[tracing::instrument(skip_all)]
133    async fn scan(self) {
134        let mut events = self.consensus.read().await.event_stream();
135        while let Some(event) = events.next().await {
136            let EventType::QuorumProposal { proposal, .. } = event.event else {
137                continue;
138            };
139            // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
140            // to the anchor. This allows state replay from the decided state.
141            let parent_view = proposal.data.justify_qc().view_number;
142            let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
143            self.request((parent_view, parent_leaf)).await;
144        }
145    }
146
147    #[tracing::instrument(skip_all)]
148    async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
149        let mut receiver = std::pin::pin!(receiver);
150        while let Some(req) = receiver.next().await {
151            self.fetch_request(req).await;
152        }
153    }
154
155    async fn request(&self, req: Request) {
156        self.sender.send(req).await.ok();
157        self.metrics.queue_len.set(self.sender.len());
158        self.metrics.last_seen.set(req.0.u64() as usize);
159    }
160
161    async fn fetch_request(&self, (view, leaf): Request) {
162        let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
163        let res: anyhow::Result<()> = async {
164            let anchor_view = self
165                .persistence
166                .load_anchor_view()
167                .await
168                .context("loading anchor view")?;
169            if view <= anchor_view {
170                tracing::debug!(?anchor_view, "skipping already-decided proposal");
171                return Ok(());
172            }
173
174            match self.persistence.load_quorum_proposal(view).await {
175                Ok(proposal) => {
176                    // If we already have the proposal in storage, keep traversing the chain to its
177                    // parent.
178                    let view = proposal.data.justify_qc().view_number;
179                    let leaf = proposal.data.justify_qc().data.leaf_commit;
180                    self.request((view, leaf)).await;
181                    return Ok(());
182                },
183                Err(err) => {
184                    tracing::info!("proposal missing from storage; fetching from network: {err:#}");
185                },
186            }
187
188            let future = self.consensus.read().await.request_proposal(view, leaf)?;
189            let proposal = timeout(self.cfg.fetch_timeout, future)
190                .await
191                .context("timed out fetching proposal")?
192                .context("error fetching proposal")?;
193            self.persistence
194                .append_quorum_proposal2(&proposal)
195                .await
196                .context("error saving fetched proposal")?;
197
198            // Add the fetched leaf to HotShot state, so consensus can make use of it.
199            let leaf = Leaf2::from_quorum_proposal(&proposal.data);
200            let handle = self.consensus.read().await;
201            let consensus = handle.consensus();
202            let mut consensus = consensus.write().await;
203            if matches!(
204                consensus.validated_state_map().get(&view),
205                None | Some(View {
206                    // Replace a Da-only view with a Leaf view, which has strictly more information.
207                    view_inner: ViewInner::Da { .. }
208                })
209            ) {
210                let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
211                if let Err(err) = consensus.update_leaf(leaf, state, None) {
212                    tracing::warn!("unable to update leaf: {err:#}");
213                }
214            }
215
216            self.metrics.last_fetched.set(view.u64() as usize);
217            self.metrics.fetched.add(1);
218
219            Ok(())
220        }
221        .instrument(span)
222        .await;
223        if let Err(err) = res {
224            tracing::warn!("failed to fetch proposal: {err:#}");
225            self.metrics.failed.add(1);
226
227            // Avoid busy loop when operations are failing.
228            sleep(Duration::from_secs(1)).await;
229
230            // If we fail fetching the proposal, don't let it clog up the fetching task. Just push
231            // it back onto the queue and move onto the next proposal.
232            self.request((view, leaf)).await;
233        }
234    }
235}