1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use clap::Parser;
6use committable::Commitment;
7use derivative::Derivative;
8use espresso_types::{PubKey, ValidatedState, parse_duration, v0::traits::SequencerPersistence};
9use futures::stream::StreamExt;
10use hotshot_types::{
11 data::{Leaf2, QuorumProposalWrapper, ViewNumber},
12 event::{Event, EventType},
13 message::Proposal,
14 new_protocol::CoordinatorEvent,
15 traits::{
16 ValidatedState as _,
17 metrics::{Counter, Gauge, Metrics},
18 network::ConnectedNetwork,
19 },
20};
21use serde::Serialize;
22use tokio::time::{sleep, timeout};
23use tracing::Instrument;
24
25use crate::{
26 SeqTypes,
27 consensus_handle::ConsensusHandle,
28 context::{ConsensusNode, TaskList},
29};
30
31#[derive(Clone, Copy, Debug, Parser, Serialize)]
32pub struct ProposalFetcherConfig {
33 #[clap(
34 long = "proposal-fetcher-num-workers",
35 env = "ESPRESSO_NODE_PROPOSAL_FETCHER_NUM_WORKERS",
36 default_value = "2"
37 )]
38 pub num_workers: usize,
39
40 #[clap(
41 long = "proposal-fetcher-fetch-timeout",
42 env = "ESPRESSO_NODE_PROPOSAL_FETCHER_FETCH_TIMEOUT",
43 default_value = "2s",
44 value_parser = parse_duration,
45 )]
46 pub fetch_timeout: Duration,
47}
48
49impl Default for ProposalFetcherConfig {
50 fn default() -> Self {
51 Self::parse_from(std::iter::empty::<String>())
52 }
53}
54
55impl ProposalFetcherConfig {
56 pub(crate) fn spawn<N, P>(
57 self,
58 tasks: &mut TaskList,
59 consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
60 persistence: Arc<P>,
61 metrics: &(impl Metrics + ?Sized),
62 ) where
63 N: ConnectedNetwork<PubKey>,
64 P: SequencerPersistence,
65 {
66 let (sender, receiver) = async_channel::unbounded();
67 let fetcher = ProposalFetcher {
68 sender,
69 consensus_handle,
70 persistence,
71 cfg: self,
72 metrics: ProposalFetcherMetrics::new(metrics),
73 };
74
75 tasks.spawn("proposal scanner", fetcher.clone().scan());
76 for i in 0..self.num_workers {
77 tasks.spawn(
78 format!("proposal fetcher {i}"),
79 fetcher.clone().fetch(receiver.clone()),
80 );
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
86struct ProposalFetcherMetrics {
87 fetched: Arc<dyn Counter>,
88 failed: Arc<dyn Counter>,
89 queue_len: Arc<dyn Gauge>,
90 last_seen: Arc<dyn Gauge>,
91 last_fetched: Arc<dyn Gauge>,
92}
93
94impl ProposalFetcherMetrics {
95 fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
96 let metrics = metrics.subgroup("proposal_fetcher".into());
97 Self {
98 fetched: metrics.create_counter("fetched".into(), None).into(),
99 failed: metrics.create_counter("failed".into(), None).into(),
100 queue_len: metrics.create_gauge("queue_len".into(), None).into(),
101 last_seen: metrics
102 .create_gauge("last_seen".into(), Some("view".into()))
103 .into(),
104 last_fetched: metrics
105 .create_gauge("last_fetched".into(), Some("view".into()))
106 .into(),
107 }
108 }
109}
110
111type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
112
113#[derive(Derivative)]
114#[derivative(Clone(bound = ""), Debug(bound = ""))]
115struct ProposalFetcher<N, P>
116where
117 N: ConnectedNetwork<PubKey>,
118 P: SequencerPersistence,
119{
120 sender: Sender<Request>,
121 #[derivative(Debug = "ignore")]
122 consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
123 #[derivative(Debug = "ignore")]
124 persistence: Arc<P>,
125 cfg: ProposalFetcherConfig,
126 metrics: ProposalFetcherMetrics,
127}
128
129impl<N, P> ProposalFetcher<N, P>
130where
131 N: ConnectedNetwork<PubKey>,
132 P: SequencerPersistence,
133{
134 #[tracing::instrument(skip_all)]
135 async fn scan(self) {
136 let mut events = self.consensus_handle.event_stream();
137 while let Some(event) = events.next().await {
138 let (parent_view, parent_leaf) = match event {
139 CoordinatorEvent::LegacyEvent(Event {
140 event: EventType::QuorumProposal { proposal, .. },
141 ..
142 }) => {
143 let parent_view = proposal.data.justify_qc().view_number;
144 let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
145 (parent_view, parent_leaf)
146 },
147 CoordinatorEvent::QuorumProposal { proposal, .. } => {
148 let parent_view = proposal.data.justify_qc.view_number;
149 let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
150 (parent_view, parent_leaf)
151 },
152 _ => continue,
153 };
154 self.request((parent_view, parent_leaf)).await;
157 }
158 }
159
160 #[tracing::instrument(skip_all)]
161 async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
162 let mut receiver = std::pin::pin!(receiver);
163 while let Some(req) = receiver.next().await {
164 self.fetch_request(req).await;
165 }
166 }
167
168 async fn request(&self, req: Request) {
169 self.sender.send(req).await.ok();
170 self.metrics.queue_len.set(self.sender.len());
171 self.metrics.last_seen.set(req.0.u64() as usize);
172 }
173
174 async fn fetch_request(&self, (view, leaf): Request) {
175 let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
176 let res: anyhow::Result<()> = async {
177 let anchor_view = self
178 .persistence
179 .load_anchor_view()
180 .await
181 .context("loading anchor view")?;
182 if view <= anchor_view {
183 tracing::debug!(?anchor_view, "skipping already-decided proposal");
184 return Ok(());
185 }
186
187 match self.persistence.load_quorum_proposal(view).await {
188 Ok(proposal) => {
189 let view = proposal.data.justify_qc().view_number;
192 let leaf = proposal.data.justify_qc().data.leaf_commit;
193 self.request((view, leaf)).await;
194 return Ok(());
195 },
196 Err(err) => {
197 tracing::info!("proposal missing from storage; fetching from network: {err:#}");
198 },
199 }
200
201 let future = self.consensus_handle.request_proposal(view, leaf).await?;
202 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
203 timeout(self.cfg.fetch_timeout, future)
204 .await
205 .context("timed out fetching proposal")?
206 .context("error fetching proposal")?;
207 self.persistence
208 .append_quorum_proposal2(&proposal)
209 .await
210 .context("error saving fetched proposal")?;
211
212 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
216 if self.consensus_handle.state(view).await.is_none() {
217 let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
218 if let Err(err) = self.consensus_handle.update_leaf(leaf, state, None).await {
219 tracing::info!("unable to update leaf: {err:#}");
220 }
221 }
222
223 self.metrics.last_fetched.set(view.u64() as usize);
224 self.metrics.fetched.add(1);
225
226 Ok(())
227 }
228 .instrument(span)
229 .await;
230 if let Err(err) = res {
231 tracing::warn!("failed to fetch proposal: {err:#}");
232 self.metrics.failed.add(1);
233
234 sleep(Duration::from_secs(1)).await;
236
237 self.request((view, leaf)).await;
240 }
241 }
242}