1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use async_lock::RwLock;
6use clap::Parser;
7use committable::Commitment;
8use derivative::Derivative;
9use espresso_types::{PubKey, ValidatedState, parse_duration, v0::traits::SequencerPersistence};
10use futures::stream::StreamExt;
11use hotshot::types::EventType;
12use hotshot_types::{
13 data::{Leaf2, ViewNumber},
14 traits::{
15 ValidatedState as _,
16 metrics::{Counter, Gauge, Metrics},
17 network::ConnectedNetwork,
18 },
19 utils::{View, ViewInner},
20};
21use tokio::time::{sleep, timeout};
22use tracing::Instrument;
23
24use crate::{
25 SeqTypes,
26 context::{Consensus, TaskList},
27};
28
29#[derive(Clone, Copy, Debug, Parser)]
30pub struct ProposalFetcherConfig {
31 #[clap(
32 long = "proposal-fetcher-num-workers",
33 env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
34 default_value = "2"
35 )]
36 pub num_workers: usize,
37
38 #[clap(
39 long = "proposal-fetcher-fetch-timeout",
40 env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
41 default_value = "2s",
42 value_parser = parse_duration,
43 )]
44 pub fetch_timeout: Duration,
45}
46
47impl Default for ProposalFetcherConfig {
48 fn default() -> Self {
49 Self::parse_from(std::iter::empty::<String>())
50 }
51}
52
53impl ProposalFetcherConfig {
54 pub(crate) fn spawn<N, P>(
55 self,
56 tasks: &mut TaskList,
57 consensus: Arc<RwLock<Consensus<N, P>>>,
58 persistence: Arc<P>,
59 metrics: &(impl Metrics + ?Sized),
60 ) where
61 N: ConnectedNetwork<PubKey>,
62 P: SequencerPersistence,
63 {
64 let (sender, receiver) = async_channel::unbounded();
65 let fetcher = ProposalFetcher {
66 sender,
67 consensus,
68 persistence,
69 cfg: self,
70 metrics: ProposalFetcherMetrics::new(metrics),
71 };
72
73 tasks.spawn("proposal scanner", fetcher.clone().scan());
74 for i in 0..self.num_workers {
75 tasks.spawn(
76 format!("proposal fetcher {i}"),
77 fetcher.clone().fetch(receiver.clone()),
78 );
79 }
80 }
81}
82
83#[derive(Clone, Debug)]
84struct ProposalFetcherMetrics {
85 fetched: Arc<dyn Counter>,
86 failed: Arc<dyn Counter>,
87 queue_len: Arc<dyn Gauge>,
88 last_seen: Arc<dyn Gauge>,
89 last_fetched: Arc<dyn Gauge>,
90}
91
92impl ProposalFetcherMetrics {
93 fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
94 let metrics = metrics.subgroup("proposal_fetcher".into());
95 Self {
96 fetched: metrics.create_counter("fetched".into(), None).into(),
97 failed: metrics.create_counter("failed".into(), None).into(),
98 queue_len: metrics.create_gauge("queue_len".into(), None).into(),
99 last_seen: metrics
100 .create_gauge("last_seen".into(), Some("view".into()))
101 .into(),
102 last_fetched: metrics
103 .create_gauge("last_fetched".into(), Some("view".into()))
104 .into(),
105 }
106 }
107}
108
109type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
110
111#[derive(Derivative)]
112#[derivative(Clone(bound = ""), Debug(bound = ""))]
113struct ProposalFetcher<N, P>
114where
115 N: ConnectedNetwork<PubKey>,
116 P: SequencerPersistence,
117{
118 sender: Sender<Request>,
119 #[derivative(Debug = "ignore")]
120 consensus: Arc<RwLock<Consensus<N, P>>>,
121 #[derivative(Debug = "ignore")]
122 persistence: Arc<P>,
123 cfg: ProposalFetcherConfig,
124 metrics: ProposalFetcherMetrics,
125}
126
127impl<N, P> ProposalFetcher<N, P>
128where
129 N: ConnectedNetwork<PubKey>,
130 P: SequencerPersistence,
131{
132 #[tracing::instrument(skip_all)]
133 async fn scan(self) {
134 let mut events = self.consensus.read().await.event_stream();
135 while let Some(event) = events.next().await {
136 let EventType::QuorumProposal { proposal, .. } = event.event else {
137 continue;
138 };
139 let parent_view = proposal.data.justify_qc().view_number;
142 let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
143 self.request((parent_view, parent_leaf)).await;
144 }
145 }
146
147 #[tracing::instrument(skip_all)]
148 async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
149 let mut receiver = std::pin::pin!(receiver);
150 while let Some(req) = receiver.next().await {
151 self.fetch_request(req).await;
152 }
153 }
154
155 async fn request(&self, req: Request) {
156 self.sender.send(req).await.ok();
157 self.metrics.queue_len.set(self.sender.len());
158 self.metrics.last_seen.set(req.0.u64() as usize);
159 }
160
161 async fn fetch_request(&self, (view, leaf): Request) {
162 let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
163 let res: anyhow::Result<()> = async {
164 let anchor_view = self
165 .persistence
166 .load_anchor_view()
167 .await
168 .context("loading anchor view")?;
169 if view <= anchor_view {
170 tracing::debug!(?anchor_view, "skipping already-decided proposal");
171 return Ok(());
172 }
173
174 match self.persistence.load_quorum_proposal(view).await {
175 Ok(proposal) => {
176 let view = proposal.data.justify_qc().view_number;
179 let leaf = proposal.data.justify_qc().data.leaf_commit;
180 self.request((view, leaf)).await;
181 return Ok(());
182 },
183 Err(err) => {
184 tracing::info!("proposal missing from storage; fetching from network: {err:#}");
185 },
186 }
187
188 let future = self.consensus.read().await.request_proposal(view, leaf)?;
189 let proposal = timeout(self.cfg.fetch_timeout, future)
190 .await
191 .context("timed out fetching proposal")?
192 .context("error fetching proposal")?;
193 self.persistence
194 .append_quorum_proposal2(&proposal)
195 .await
196 .context("error saving fetched proposal")?;
197
198 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
200 let handle = self.consensus.read().await;
201 let consensus = handle.consensus();
202 let mut consensus = consensus.write().await;
203 if matches!(
204 consensus.validated_state_map().get(&view),
205 None | Some(View {
206 view_inner: ViewInner::Da { .. }
208 })
209 ) {
210 let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
211 if let Err(err) = consensus.update_leaf(leaf, state, None) {
212 tracing::warn!("unable to update leaf: {err:#}");
213 }
214 }
215
216 self.metrics.last_fetched.set(view.u64() as usize);
217 self.metrics.fetched.add(1);
218
219 Ok(())
220 }
221 .instrument(span)
222 .await;
223 if let Err(err) = res {
224 tracing::warn!("failed to fetch proposal: {err:#}");
225 self.metrics.failed.add(1);
226
227 sleep(Duration::from_secs(1)).await;
229
230 self.request((view, leaf)).await;
233 }
234 }
235}