Skip to main content

espresso_node/
proposal_fetcher.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use clap::Parser;
6use committable::Commitment;
7use derivative::Derivative;
8use espresso_types::{PubKey, ValidatedState, parse_duration, v0::traits::SequencerPersistence};
9use futures::stream::StreamExt;
10use hotshot_types::{
11    data::{Leaf2, QuorumProposalWrapper, ViewNumber},
12    event::{Event, EventType},
13    message::Proposal,
14    new_protocol::CoordinatorEvent,
15    traits::{
16        ValidatedState as _,
17        metrics::{Counter, Gauge, Metrics},
18        network::ConnectedNetwork,
19    },
20};
21use serde::Serialize;
22use tokio::time::{sleep, timeout};
23use tracing::Instrument;
24
25use crate::{
26    SeqTypes,
27    consensus_handle::ConsensusHandle,
28    context::{ConsensusNode, TaskList},
29};
30
31#[derive(Clone, Copy, Debug, Parser, Serialize)]
32pub struct ProposalFetcherConfig {
33    #[clap(
34        long = "proposal-fetcher-num-workers",
35        env = "ESPRESSO_NODE_PROPOSAL_FETCHER_NUM_WORKERS",
36        default_value = "2"
37    )]
38    pub num_workers: usize,
39
40    #[clap(
41        long = "proposal-fetcher-fetch-timeout",
42        env = "ESPRESSO_NODE_PROPOSAL_FETCHER_FETCH_TIMEOUT",
43        default_value = "2s",
44        value_parser = parse_duration,
45    )]
46    pub fetch_timeout: Duration,
47}
48
49impl Default for ProposalFetcherConfig {
50    fn default() -> Self {
51        Self::parse_from(std::iter::empty::<String>())
52    }
53}
54
55impl ProposalFetcherConfig {
56    pub(crate) fn spawn<N, P>(
57        self,
58        tasks: &mut TaskList,
59        consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
60        persistence: Arc<P>,
61        metrics: &(impl Metrics + ?Sized),
62    ) where
63        N: ConnectedNetwork<PubKey>,
64        P: SequencerPersistence,
65    {
66        let (sender, receiver) = async_channel::unbounded();
67        let fetcher = ProposalFetcher {
68            sender,
69            consensus_handle,
70            persistence,
71            cfg: self,
72            metrics: ProposalFetcherMetrics::new(metrics),
73        };
74
75        tasks.spawn("proposal scanner", fetcher.clone().scan());
76        for i in 0..self.num_workers {
77            tasks.spawn(
78                format!("proposal fetcher {i}"),
79                fetcher.clone().fetch(receiver.clone()),
80            );
81        }
82    }
83}
84
85#[derive(Clone, Debug)]
86struct ProposalFetcherMetrics {
87    fetched: Arc<dyn Counter>,
88    failed: Arc<dyn Counter>,
89    queue_len: Arc<dyn Gauge>,
90    last_seen: Arc<dyn Gauge>,
91    last_fetched: Arc<dyn Gauge>,
92}
93
94impl ProposalFetcherMetrics {
95    fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
96        let metrics = metrics.subgroup("proposal_fetcher".into());
97        Self {
98            fetched: metrics.create_counter("fetched".into(), None).into(),
99            failed: metrics.create_counter("failed".into(), None).into(),
100            queue_len: metrics.create_gauge("queue_len".into(), None).into(),
101            last_seen: metrics
102                .create_gauge("last_seen".into(), Some("view".into()))
103                .into(),
104            last_fetched: metrics
105                .create_gauge("last_fetched".into(), Some("view".into()))
106                .into(),
107        }
108    }
109}
110
111type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
112
113#[derive(Derivative)]
114#[derivative(Clone(bound = ""), Debug(bound = ""))]
115struct ProposalFetcher<N, P>
116where
117    N: ConnectedNetwork<PubKey>,
118    P: SequencerPersistence,
119{
120    sender: Sender<Request>,
121    #[derivative(Debug = "ignore")]
122    consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
123    #[derivative(Debug = "ignore")]
124    persistence: Arc<P>,
125    cfg: ProposalFetcherConfig,
126    metrics: ProposalFetcherMetrics,
127}
128
129impl<N, P> ProposalFetcher<N, P>
130where
131    N: ConnectedNetwork<PubKey>,
132    P: SequencerPersistence,
133{
134    #[tracing::instrument(skip_all)]
135    async fn scan(self) {
136        let mut events = self.consensus_handle.event_stream();
137        while let Some(event) = events.next().await {
138            let (parent_view, parent_leaf) = match event {
139                CoordinatorEvent::LegacyEvent(Event {
140                    event: EventType::QuorumProposal { proposal, .. },
141                    ..
142                }) => {
143                    let parent_view = proposal.data.justify_qc().view_number;
144                    let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
145                    (parent_view, parent_leaf)
146                },
147                CoordinatorEvent::QuorumProposal { proposal, .. } => {
148                    let parent_view = proposal.data.justify_qc.view_number;
149                    let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
150                    (parent_view, parent_leaf)
151                },
152                _ => continue,
153            };
154            // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
155            // to the anchor. This allows state replay from the decided state.
156            self.request((parent_view, parent_leaf)).await;
157        }
158    }
159
160    #[tracing::instrument(skip_all)]
161    async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
162        let mut receiver = std::pin::pin!(receiver);
163        while let Some(req) = receiver.next().await {
164            self.fetch_request(req).await;
165        }
166    }
167
168    async fn request(&self, req: Request) {
169        self.sender.send(req).await.ok();
170        self.metrics.queue_len.set(self.sender.len());
171        self.metrics.last_seen.set(req.0.u64() as usize);
172    }
173
174    async fn fetch_request(&self, (view, leaf): Request) {
175        let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
176        let res: anyhow::Result<()> = async {
177            let anchor_view = self
178                .persistence
179                .load_anchor_view()
180                .await
181                .context("loading anchor view")?;
182            if view <= anchor_view {
183                tracing::debug!(?anchor_view, "skipping already-decided proposal");
184                return Ok(());
185            }
186
187            match self.persistence.load_quorum_proposal(view).await {
188                Ok(proposal) => {
189                    // If we already have the proposal in storage, keep traversing the chain to its
190                    // parent.
191                    let view = proposal.data.justify_qc().view_number;
192                    let leaf = proposal.data.justify_qc().data.leaf_commit;
193                    self.request((view, leaf)).await;
194                    return Ok(());
195                },
196                Err(err) => {
197                    tracing::info!("proposal missing from storage; fetching from network: {err:#}");
198                },
199            }
200
201            let future = self.consensus_handle.request_proposal(view, leaf).await?;
202            let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
203                timeout(self.cfg.fetch_timeout, future)
204                    .await
205                    .context("timed out fetching proposal")?
206                    .context("error fetching proposal")?;
207            self.persistence
208                .append_quorum_proposal2(&proposal)
209                .await
210                .context("error saving fetched proposal")?;
211
212            // Add the fetched leaf to consensus state, so consensus can make use of it.
213            // Only update if the view is missing or DA-only (state() returns None for
214            // both cases) — don't overwrite an existing Leaf view.
215            let leaf = Leaf2::from_quorum_proposal(&proposal.data);
216            if self.consensus_handle.state(view).await.is_none() {
217                let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
218                if let Err(err) = self.consensus_handle.update_leaf(leaf, state, None).await {
219                    tracing::info!("unable to update leaf: {err:#}");
220                }
221            }
222
223            self.metrics.last_fetched.set(view.u64() as usize);
224            self.metrics.fetched.add(1);
225
226            Ok(())
227        }
228        .instrument(span)
229        .await;
230        if let Err(err) = res {
231            tracing::warn!("failed to fetch proposal: {err:#}");
232            self.metrics.failed.add(1);
233
234            // Avoid busy loop when operations are failing.
235            sleep(Duration::from_secs(1)).await;
236
237            // If we fail fetching the proposal, don't let it clog up the fetching task. Just push
238            // it back onto the queue and move onto the next proposal.
239            self.request((view, leaf)).await;
240        }
241    }
242}