1use std::{
8 collections::{BTreeMap, BTreeSet},
9 sync::{
10 Arc,
11 atomic::{AtomicBool, Ordering},
12 },
13 time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20 consensus::OuterConsensus,
21 data::{EpochNumber, ViewNumber},
22 epoch_membership::EpochMembershipCoordinator,
23 simple_vote::HasEpoch,
24 traits::{
25 block_contents::BlockHeader,
26 network::{ConnectedNetwork, DataRequest, RequestKind},
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 },
30 utils::is_epoch_transition,
31 vote::HasViewNumber,
32};
33use hotshot_utils::anytrace::*;
34use rand::{seq::SliceRandom, thread_rng};
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::JoinHandle, time::sleep};
37use tracing::instrument;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
43
44pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49 pub network: Arc<I::Network>,
52
53 pub consensus: OuterConsensus<TYPES>,
56
57 pub view: ViewNumber,
59
60 pub delay: Duration,
62
63 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
65
66 pub public_key: TYPES::SignatureKey,
68
69 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
71
72 pub id: u64,
74
75 pub shutdown_flag: Arc<AtomicBool>,
77
78 pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
80
81 pub epoch_height: u64,
83}
84
85impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
86 fn drop(&mut self) {
87 self.cancel_subtasks();
88 }
89}
90
91type Signature<TYPES> =
93 <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
94
95#[async_trait]
96impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
97 type Event = HotShotEvent<TYPES>;
98
99 #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
100 async fn handle_event(
101 &mut self,
102 event: Arc<Self::Event>,
103 sender: &Sender<Arc<Self::Event>>,
104 _receiver: &Receiver<Arc<Self::Event>>,
105 ) -> Result<()> {
106 match event.as_ref() {
107 HotShotEvent::QuorumProposalValidated(proposal, _) => {
108 let prop_view = proposal.data.view_number();
109 let prop_epoch = proposal.data.epoch();
110
111 let membership = self
115 .membership_coordinator
116 .stake_table_for_epoch(prop_epoch)?;
117 let mut target_epochs = BTreeSet::new();
118 if membership.has_stake(&self.public_key) {
119 target_epochs.insert(prop_epoch);
120 }
121 if is_epoch_transition(
122 proposal.data.block_header().block_number(),
123 self.epoch_height,
124 ) && membership
125 .next_epoch_stake_table()?
126 .has_stake(&self.public_key)
127 {
128 target_epochs.insert(prop_epoch.map(|e| e + 1));
129 }
130
131 ensure!(
132 !target_epochs.is_empty(),
133 "We don't belong to the current epoch and we don't belong to the next epoch. \
134 Do not request VID share."
135 );
136
137 let consensus_reader = self.consensus.read().await;
138 let maybe_vid_share = consensus_reader
139 .vid_shares()
140 .get(&prop_view)
141 .and_then(|shares| shares.get(&self.public_key));
142 if prop_view >= self.view
144 && (maybe_vid_share.is_none()
145 || !target_epochs
146 .iter()
147 .all(|e| maybe_vid_share.unwrap().contains_key(e)))
148 {
149 drop(consensus_reader);
150 self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
151 .await;
152 }
153 Ok(())
154 },
155 HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
156 let view = vid_proposal.data.view_number();
157 let epoch = vid_proposal.data.epoch();
158
159 let membership_reader = self.membership_coordinator.membership_for_epoch(epoch)?;
161 let mut da_committee_for_view: BTreeSet<_> = membership_reader
162 .da_committee_members(view)
163 .cloned()
164 .collect();
165 if let Ok(leader) = membership_reader.leader(view) {
166 da_committee_for_view.insert(leader);
167 }
168 drop(membership_reader);
169
170 ensure!(
171 self.spawned_tasks.contains_key(&view),
172 info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
173 );
174
175 ensure!(
176 da_committee_for_view.contains(sender_key),
177 warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
178 );
179
180 ensure!(
181 sender_key.validate(
182 &vid_proposal.signature,
183 vid_proposal.data.payload_commitment_ref()
184 ),
185 warn!("Received VidResponseRecv with invalid signature")
186 );
187
188 tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
189 broadcast_event(
190 Arc::new(HotShotEvent::VidShareRecv(
191 sender_key.clone(),
192 vid_proposal.clone(),
193 )),
194 sender,
195 )
196 .await;
197 Ok(())
198 },
199 HotShotEvent::ViewChange(view, _) => {
200 let view = *view;
201 if view > self.view {
202 self.view = view;
203 }
204 self.spawned_tasks
206 .range_mut(..self.view)
207 .for_each(|(_, handles)| {
208 handles.retain(|handle| !handle.is_finished());
209 });
210 self.spawned_tasks
211 .retain(|view, handles| view >= &self.view || !handles.is_empty());
212 Ok(())
213 },
214 _ => Ok(()),
215 }
216 }
217
218 fn cancel_subtasks(&mut self) {
219 self.shutdown_flag.store(true, Ordering::Relaxed);
220
221 while !self.spawned_tasks.is_empty() {
222 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
223 break;
224 };
225
226 for handle in handles {
227 handle.abort();
228 }
229 }
230 }
231}
232
233impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
234 async fn spawn_requests(
236 &mut self,
237 view: ViewNumber,
238 prop_epoch: Option<EpochNumber>,
239 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
240 target_epochs: BTreeSet<Option<EpochNumber>>,
241 ) {
242 let request = RequestKind::Vid(view, self.public_key.clone());
243
244 if let Some(signature) = self.serialize_and_sign(&request) {
246 self.create_vid_request_task(
247 request,
248 signature,
249 sender.clone(),
250 view,
251 prop_epoch,
252 target_epochs,
253 )
254 .await;
255 }
256 }
257
258 async fn create_vid_request_task(
261 &mut self,
262 request: RequestKind<TYPES>,
263 signature: Signature<TYPES>,
264 sender: Sender<Arc<HotShotEvent<TYPES>>>,
265 view: ViewNumber,
266 prop_epoch: Option<EpochNumber>,
267 mut target_epochs: BTreeSet<Option<EpochNumber>>,
268 ) {
269 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
270 let network = Arc::clone(&self.network);
271 let shutdown_flag = Arc::clone(&self.shutdown_flag);
272 let delay = self.delay;
273 let public_key = self.public_key.clone();
274
275 let membership_reader = match self.membership_coordinator.membership_for_epoch(prop_epoch) {
277 Ok(m) => m,
278 Err(e) => {
279 tracing::warn!(e.message);
280 return;
281 },
282 };
283 let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
285 .da_committee_members(view)
286 .cloned()
287 .collect();
288
289 recipients.shuffle(&mut thread_rng());
292
293 let data_request = DataRequest::<TYPES> {
295 request,
296 view,
297 signature,
298 };
299 let my_id = self.id;
300 let handle: JoinHandle<()> = spawn(async move {
301 if !network.is_primary_down() {
303 sleep(delay).await;
304 }
305
306 let mut recipients_it = recipients.iter();
307 while !Self::cancel_vid_request_task(
309 &consensus,
310 &public_key,
311 &view,
312 &shutdown_flag,
313 my_id,
314 &mut target_epochs,
315 )
316 .await
317 {
318 if let Some(recipient) = recipients_it.next() {
320 if *recipient == public_key {
321 continue;
324 }
325 tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
326 broadcast_event(
328 HotShotEvent::VidRequestSend(
329 data_request.clone(),
330 public_key.clone(),
331 recipient.clone(),
332 )
333 .into(),
334 &sender,
335 )
336 .await;
337 sleep(REQUEST_TIMEOUT).await;
339 } else {
340 tracing::warn!(
342 "Sent VID request to all available DA members and got no response for \
343 view: {view:?}, my id: {my_id:?}"
344 );
345 return;
346 }
347 }
348 });
349 self.spawned_tasks.entry(view).or_default().push(handle);
350 }
351
352 async fn cancel_vid_request_task(
354 consensus: &OuterConsensus<TYPES>,
355 public_key: &<TYPES as NodeType>::SignatureKey,
356 view: &ViewNumber,
357 shutdown_flag: &Arc<AtomicBool>,
358 id: u64,
359 target_epochs: &mut BTreeSet<Option<EpochNumber>>,
360 ) -> bool {
361 let consensus_reader = consensus.read().await;
362
363 let maybe_vid_shares = consensus_reader
364 .vid_shares()
365 .get(view)
366 .and_then(|key_map| key_map.get(public_key));
367 if let Some(vid_shares) = maybe_vid_shares {
368 tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
369 for vid_share in vid_shares.values() {
370 target_epochs.remove(&vid_share.data.target_epoch());
371 }
372 }
373 let cancel = shutdown_flag.load(Ordering::Relaxed)
374 || consensus_reader.cur_view() > *view
375 || target_epochs.is_empty();
376 if cancel {
377 tracing::debug!(
378 "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
379 view,
380 consensus_reader.cur_view(),
381 id,
382 );
383 }
384 cancel
385 }
386
387 fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
389 let Ok(data) = bincode::serialize(&request) else {
390 tracing::error!("Failed to serialize request!");
391 return None;
392 };
393 let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
394 else {
395 tracing::error!("Failed to sign Data Request");
396 return None;
397 };
398 Some(signature)
399 }
400}