1use std::{
8 collections::{BTreeMap, BTreeSet, HashMap},
9 hash::{DefaultHasher, Hash},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_libp2p_networking::network::log_summary::LogEvent;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{EpochNumber, VidDisperse, VidDisperseShare, ViewNumber},
20 epoch_membership::EpochMembershipCoordinator,
21 event::{Event, EventType, HotShotAction},
22 message::{
23 DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal,
24 SequencingMessage, UpgradeLock, convert_proposal,
25 },
26 simple_vote::HasEpoch,
27 storage_metrics::StorageMetricsValue,
28 traits::{
29 network::{
30 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
31 ViewMessage,
32 },
33 node_implementation::NodeType,
34 storage::Storage,
35 },
36 vote::{HasViewNumber, Vote},
37};
38use hotshot_utils::anytrace::*;
39use tokio::{spawn, task::JoinHandle, time::Instant};
40use tracing::instrument;
41
42use crate::{
43 events::{HotShotEvent, HotShotTaskCompleted},
44 helpers::broadcast_event,
45};
46
47#[derive(Clone)]
49pub struct NetworkMessageTaskState<TYPES: NodeType> {
50 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
52
53 pub external_event_stream: Sender<Event<TYPES>>,
55
56 pub public_key: TYPES::SignatureKey,
58
59 pub upgrade_lock: UpgradeLock<TYPES>,
61
62 pub id: u64,
64}
65
66impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
67 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
68 pub async fn handle_message(&mut self, message: Message<TYPES>) {
70 match &message.kind {
71 MessageKind::Consensus(_) => {
72 tracing::debug!("Received consensus message from network: {:?}", message)
73 },
74 MessageKind::Data(_) => {
75 tracing::trace!("Received data message from network: {:?}", message)
76 },
77 MessageKind::External(_) => {
78 tracing::trace!("Received external message from network: {:?}", message)
79 },
80 }
81
82 let sender = message.sender;
84 match message.kind {
85 MessageKind::Consensus(consensus_message) => {
87 let event = match consensus_message {
88 SequencingMessage::General(general_message) => match general_message {
89 GeneralConsensusMessage::Proposal(proposal) => {
90 if self
91 .upgrade_lock
92 .epochs_enabled(proposal.data.view_number())
93 {
94 tracing::warn!(
95 "received GeneralConsensusMessage::Proposal for view {} but \
96 epochs are enabled for that view",
97 proposal.data.view_number()
98 );
99 return;
100 }
101 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102 },
103 GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104 if !self
105 .upgrade_lock
106 .proposal2_legacy_version(proposal.data.view_number())
107 {
108 tracing::warn!(
109 "received GeneralConsensusMessage::Proposal2Legacy for view \
110 {} but we are in the wrong version for that message type",
111 proposal.data.view_number()
112 );
113 return;
114 }
115 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
116 },
117 GeneralConsensusMessage::Proposal2(proposal) => {
118 if !self
119 .upgrade_lock
120 .proposal2_version(proposal.data.view_number())
121 {
122 tracing::warn!(
123 "received GeneralConsensusMessage::Proposal2 for view {} but \
124 we are in the wrong version for that message type",
125 proposal.data.view_number()
126 );
127 return;
128 }
129 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
130 },
131 GeneralConsensusMessage::ProposalRequested(req, sig) => {
132 HotShotEvent::QuorumProposalRequestRecv(req, sig)
133 },
134 GeneralConsensusMessage::ProposalResponse(proposal) => {
135 if self
136 .upgrade_lock
137 .epochs_enabled(proposal.data.view_number())
138 {
139 tracing::warn!(
140 "received GeneralConsensusMessage::ProposalResponse for view \
141 {} but we are in the wrong version for that message type",
142 proposal.data.view_number()
143 );
144 return;
145 }
146 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
147 },
148 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
149 if !self
150 .upgrade_lock
151 .proposal2_legacy_version(proposal.data.view_number())
152 {
153 tracing::warn!(
154 "received GeneralConsensusMessage::ProposalResponse2Legacy \
155 for view {} but we are in the wrong version for that message \
156 type",
157 proposal.data.view_number()
158 );
159 return;
160 }
161 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
162 },
163 GeneralConsensusMessage::ProposalResponse2(proposal) => {
164 if !self
165 .upgrade_lock
166 .proposal2_version(proposal.data.view_number())
167 {
168 tracing::warn!(
169 "received GeneralConsensusMessage::ProposalResponse2 for view \
170 {} but epochs are not enabled for that view",
171 proposal.data.view_number()
172 );
173 return;
174 }
175 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
176 },
177 GeneralConsensusMessage::Vote(vote) => {
178 if self.upgrade_lock.epochs_enabled(vote.view_number()) {
179 tracing::warn!(
180 "received GeneralConsensusMessage::Vote for view {} but \
181 epochs are enabled for that view",
182 vote.view_number()
183 );
184 return;
185 }
186 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
187 },
188 GeneralConsensusMessage::Vote2(vote) => {
189 if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
190 tracing::warn!(
191 "received GeneralConsensusMessage::Vote2 for view {} but \
192 epochs are not enabled for that view",
193 vote.view_number()
194 );
195 return;
196 }
197 HotShotEvent::QuorumVoteRecv(vote)
198 },
199 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
200 if self
201 .upgrade_lock
202 .epochs_enabled(view_sync_message.view_number())
203 {
204 tracing::warn!(
205 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
206 view {} but epochs are enabled for that view",
207 view_sync_message.view_number()
208 );
209 return;
210 }
211 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
212 },
213 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
214 if !self
215 .upgrade_lock
216 .epochs_enabled(view_sync_message.view_number())
217 {
218 tracing::warn!(
219 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
220 view {} but epochs are not enabled for that view",
221 view_sync_message.view_number()
222 );
223 return;
224 }
225 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
226 },
227 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
228 view_sync_message,
229 ) => {
230 if self
231 .upgrade_lock
232 .epochs_enabled(view_sync_message.view_number())
233 {
234 tracing::warn!(
235 "received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view",
236 view_sync_message.view_number()
237 );
238 return;
239 }
240 HotShotEvent::ViewSyncPreCommitCertificateRecv(
241 view_sync_message.to_vsc2(),
242 )
243 },
244 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
245 view_sync_message,
246 ) => {
247 if !self
248 .upgrade_lock
249 .epochs_enabled(view_sync_message.view_number())
250 {
251 tracing::warn!(
252 "received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view",
253 view_sync_message.view_number()
254 );
255 return;
256 }
257 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
258 },
259 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
260 if self
261 .upgrade_lock
262 .epochs_enabled(view_sync_message.view_number())
263 {
264 tracing::warn!(
265 "received GeneralConsensusMessage::ViewSyncCommitVote for \
266 view {} but epochs are enabled for that view",
267 view_sync_message.view_number()
268 );
269 return;
270 }
271 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
272 },
273 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
274 if !self
275 .upgrade_lock
276 .epochs_enabled(view_sync_message.view_number())
277 {
278 tracing::warn!(
279 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
280 view {} but epochs are not enabled for that view",
281 view_sync_message.view_number()
282 );
283 return;
284 }
285 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
286 },
287 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
288 if self
289 .upgrade_lock
290 .epochs_enabled(view_sync_message.view_number())
291 {
292 tracing::warn!(
293 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
294 for view {} but epochs are enabled for that view",
295 view_sync_message.view_number()
296 );
297 return;
298 }
299 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
300 },
301 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
302 if !self
303 .upgrade_lock
304 .epochs_enabled(view_sync_message.view_number())
305 {
306 tracing::warn!(
307 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
308 for view {} but epochs are not enabled for that view",
309 view_sync_message.view_number()
310 );
311 return;
312 }
313 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
314 },
315 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
316 if self
317 .upgrade_lock
318 .epochs_enabled(view_sync_message.view_number())
319 {
320 tracing::warn!(
321 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
322 view {} but epochs are enabled for that view",
323 view_sync_message.view_number()
324 );
325 return;
326 }
327 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
328 },
329 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
330 if !self
331 .upgrade_lock
332 .epochs_enabled(view_sync_message.view_number())
333 {
334 tracing::warn!(
335 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
336 view {} but epochs are not enabled for that view",
337 view_sync_message.view_number()
338 );
339 return;
340 }
341 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
342 },
343 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
344 if self
345 .upgrade_lock
346 .epochs_enabled(view_sync_message.view_number())
347 {
348 tracing::warn!(
349 "received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view",
350 view_sync_message.view_number()
351 );
352 return;
353 }
354 HotShotEvent::ViewSyncFinalizeCertificateRecv(
355 view_sync_message.to_vsc2(),
356 )
357 },
358 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
359 view_sync_message,
360 ) => {
361 if !self
362 .upgrade_lock
363 .epochs_enabled(view_sync_message.view_number())
364 {
365 tracing::warn!(
366 "received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view",
367 view_sync_message.view_number()
368 );
369 return;
370 }
371 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
372 },
373 GeneralConsensusMessage::TimeoutVote(message) => {
374 if self.upgrade_lock.epochs_enabled(message.view_number()) {
375 tracing::warn!(
376 "received GeneralConsensusMessage::TimeoutVote for view {} \
377 but epochs are enabled for that view",
378 message.view_number()
379 );
380 return;
381 }
382 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
383 },
384 GeneralConsensusMessage::TimeoutVote2(message) => {
385 if !self.upgrade_lock.epochs_enabled(message.view_number()) {
386 tracing::warn!(
387 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
388 but epochs are not enabled for that view",
389 message.view_number()
390 );
391 return;
392 }
393 HotShotEvent::TimeoutVoteRecv(message)
394 },
395 GeneralConsensusMessage::UpgradeProposal(message) => {
396 HotShotEvent::UpgradeProposalRecv(message, sender)
397 },
398 GeneralConsensusMessage::UpgradeVote(message) => {
399 tracing::error!("Received upgrade vote!");
400 HotShotEvent::UpgradeVoteRecv(message)
401 },
402 GeneralConsensusMessage::HighQc(qc, next_qc) => {
403 HotShotEvent::HighQcRecv(qc, next_qc, sender)
404 },
405 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
406 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
407 },
408 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
409 if !self
410 .upgrade_lock
411 .proposal2_legacy_version(vote.view_number())
412 {
413 tracing::warn!(
414 "received GeneralConsensusMessage::EpochRootQuorumVote for \
415 view {} but we do not expect this message in this version",
416 vote.view_number()
417 );
418 return;
419 }
420 HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
421 },
422 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
423 if !self.upgrade_lock.proposal2_version(vote.view_number()) {
424 tracing::warn!(
425 "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
426 view {} but we do not expect this message in this version",
427 vote.view_number()
428 );
429 return;
430 }
431 HotShotEvent::EpochRootQuorumVoteRecv(vote)
432 },
433 GeneralConsensusMessage::EpochRootQc(root_qc) => {
434 if !self.upgrade_lock.proposal2_version(root_qc.view_number()) {
435 tracing::warn!(
436 "received GeneralConsensusMessage::EpochRootQc for view {} \
437 but we are in the wrong version for that message types",
438 root_qc.view_number()
439 );
440 return;
441 }
442 HotShotEvent::EpochRootQcRecv(root_qc, sender)
443 },
444 GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
445 if !self
446 .upgrade_lock
447 .proposal2_legacy_version(root_qc.view_number())
448 {
449 tracing::warn!(
450 "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
451 but we are in the wrong version for that message type",
452 root_qc.view_number()
453 );
454 return;
455 }
456 HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
457 },
458 },
459 SequencingMessage::Da(da_message) => match da_message {
460 DaConsensusMessage::DaProposal(proposal) => {
461 if self
462 .upgrade_lock
463 .epochs_enabled(proposal.data.view_number())
464 {
465 tracing::warn!(
466 "received DaConsensusMessage::DaProposal for view {} but \
467 epochs are enabled for that view",
468 proposal.data.view_number()
469 );
470 return;
471 }
472 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
473 },
474 DaConsensusMessage::DaProposal2(proposal) => {
475 if !self
476 .upgrade_lock
477 .epochs_enabled(proposal.data.view_number())
478 {
479 tracing::warn!(
480 "received DaConsensusMessage::DaProposal2 for view {} but \
481 epochs are not enabled for that view",
482 proposal.data.view_number()
483 );
484 return;
485 }
486 HotShotEvent::DaProposalRecv(proposal, sender)
487 },
488 DaConsensusMessage::DaVote(vote) => {
489 if self.upgrade_lock.epochs_enabled(vote.view_number()) {
490 tracing::warn!(
491 "received DaConsensusMessage::DaVote for view {} but epochs \
492 are enabled for that view",
493 vote.view_number()
494 );
495 return;
496 }
497 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
498 },
499 DaConsensusMessage::DaVote2(vote) => {
500 if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
501 tracing::warn!(
502 "received DaConsensusMessage::DaVote2 for view {} but epochs \
503 are not enabled for that view",
504 vote.view_number()
505 );
506 return;
507 }
508 HotShotEvent::DaVoteRecv(vote.clone())
509 },
510 DaConsensusMessage::DaCertificate(cert) => {
511 if self.upgrade_lock.epochs_enabled(cert.view_number()) {
512 tracing::warn!(
513 "received DaConsensusMessage::DaCertificate for view {} but \
514 epochs are enabled for that view",
515 cert.view_number()
516 );
517 return;
518 }
519 HotShotEvent::DaCertificateRecv(cert.to_dac2())
520 },
521 DaConsensusMessage::DaCertificate2(cert) => {
522 if !self.upgrade_lock.epochs_enabled(cert.view_number()) {
523 tracing::warn!(
524 "received DaConsensusMessage::DaCertificate2 for view {} but \
525 epochs are not enabled for that view",
526 cert.view_number()
527 );
528 return;
529 }
530 HotShotEvent::DaCertificateRecv(cert)
531 },
532 DaConsensusMessage::VidDisperseMsg(proposal) => {
533 if self
534 .upgrade_lock
535 .epochs_enabled(proposal.data.view_number())
536 {
537 tracing::warn!(
538 "received DaConsensusMessage::VidDisperseMsg for view {} but \
539 epochs are enabled for that view",
540 proposal.data.view_number()
541 );
542 return;
543 }
544 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
545 },
546 DaConsensusMessage::VidDisperseMsg1(proposal) => {
547 if !self
548 .upgrade_lock
549 .epochs_enabled(proposal.data.view_number())
550 {
551 tracing::warn!(
552 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
553 epochs are not enabled for that view",
554 proposal.data.view_number()
555 );
556 return;
557 }
558 if self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
559 tracing::warn!(
560 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
561 vid2 upgrade is enabled for that view",
562 proposal.data.view_number()
563 );
564 return;
565 }
566 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
567 },
568 DaConsensusMessage::VidDisperseMsg2(proposal) => {
569 if !self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
570 tracing::warn!(
571 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
572 vid2 upgrade is not enabled for that view",
573 proposal.data.view_number()
574 );
575 return;
576 }
577 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
578 },
579 },
580 };
581 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
582 },
583
584 MessageKind::Data(message) => match message {
586 DataMessage::SubmitTransaction(transaction, _) => {
587 let mut hasher = DefaultHasher::new();
588 transaction.hash(&mut hasher);
589 broadcast_event(
590 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
591 &self.internal_event_stream,
592 )
593 .await;
594 },
595 DataMessage::DataResponse(response) => {
596 if let ResponseMessage::Found(message) = response {
597 match message {
598 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
599 broadcast_event(
600 Arc::new(HotShotEvent::VidResponseRecv(
601 sender,
602 convert_proposal(proposal),
603 )),
604 &self.internal_event_stream,
605 )
606 .await;
607 },
608 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
609 proposal,
610 )) => {
611 broadcast_event(
612 Arc::new(HotShotEvent::VidResponseRecv(
613 sender,
614 convert_proposal(proposal),
615 )),
616 &self.internal_event_stream,
617 )
618 .await;
619 },
620 _ => {},
621 }
622 }
623 },
624 DataMessage::RequestData(data) => {
625 let req_data = data.clone();
626 if let RequestKind::Vid(_view_number, _key) = req_data.request {
627 broadcast_event(
628 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
629 &self.internal_event_stream,
630 )
631 .await;
632 }
633 },
634 },
635
636 MessageKind::External(data) => {
638 if sender == self.public_key {
639 return;
640 }
641 broadcast_event(
643 Event {
644 view_number: ViewNumber::new(1),
645 event: EventType::ExternalMessageReceived { sender, data },
646 },
647 &self.external_event_stream,
648 )
649 .await;
650 },
651 }
652 }
653}
654
655pub struct NetworkEventTaskState<
657 TYPES: NodeType,
658 NET: ConnectedNetwork<TYPES::SignatureKey>,
659 S: Storage<TYPES>,
660> {
661 pub network: Arc<NET>,
663
664 pub view: ViewNumber,
666
667 pub epoch: Option<EpochNumber>,
669
670 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
672
673 pub storage: S,
675
676 pub storage_metrics: Arc<StorageMetricsValue>,
678
679 pub consensus: OuterConsensus<TYPES>,
681
682 pub upgrade_lock: UpgradeLock<TYPES>,
684
685 pub transmit_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
687
688 pub epoch_height: u64,
690
691 pub id: u64,
693}
694
695#[async_trait]
696impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
697 TaskState for NetworkEventTaskState<TYPES, NET, S>
698{
699 type Event = HotShotEvent<TYPES>;
700
701 async fn handle_event(
702 &mut self,
703 event: Arc<Self::Event>,
704 _sender: &Sender<Arc<Self::Event>>,
705 _receiver: &Receiver<Arc<Self::Event>>,
706 ) -> Result<()> {
707 self.handle(event).await;
708
709 Ok(())
710 }
711
712 fn cancel_subtasks(&mut self) {}
713}
714
715impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
716 NetworkEventTaskState<TYPES, NET, S>
717{
718 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
722 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
723 let mut maybe_action = None;
724 if let Some((sender, message_kind, transmit)) =
725 self.parse_event(event, &mut maybe_action).await
726 {
727 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
728 .await;
729 };
730 }
731
732 async fn handle_vid_disperse_proposal(
734 &self,
735 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
736 sender: &<TYPES as NodeType>::SignatureKey,
737 ) -> Option<HotShotTaskCompleted> {
738 let view = vid_proposal.data.view_number();
739 let epoch = vid_proposal.data.epoch();
740 let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
741 let mut messages = HashMap::new();
742
743 for proposal in vid_share_proposals {
744 let recipient = proposal.data.recipient_key().clone();
745 let epochs_enabled = self
746 .upgrade_lock
747 .epochs_enabled(proposal.data.view_number());
748 let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
749 let message = if !epochs_enabled {
750 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
751 Proposal {
752 data,
753 signature: proposal.signature,
754 _pd: proposal._pd,
755 }
756 } else {
757 tracing::warn!(
758 "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
759 proposal.data.view_number()
760 );
761 return None;
762 };
763 Message {
764 sender: sender.clone(),
765 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
766 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
767 )),
768 }
769 } else if !upgraded_vid2 {
770 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
771 Proposal {
772 data,
773 signature: proposal.signature,
774 _pd: proposal._pd,
775 }
776 } else {
777 tracing::warn!(
778 "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
779 receive VidDisperseShare2",
780 proposal.data.view_number()
781 );
782 return None;
783 };
784 Message {
785 sender: sender.clone(),
786 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
787 DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
788 )),
789 }
790 } else {
791 let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
792 Proposal {
793 data,
794 signature: proposal.signature,
795 _pd: proposal._pd,
796 }
797 } else {
798 tracing::warn!(
799 "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
800 proposal.data.view_number()
801 );
802 return None;
803 };
804 Message {
805 sender: sender.clone(),
806 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
807 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
808 )),
809 }
810 };
811 let view = message.view_number();
812 let serialized_message = match self.upgrade_lock.serialize(&message) {
813 Ok(serialized) => serialized,
814 Err(e) => {
815 tracing::error!("Failed to serialize message: {e}");
816 continue;
817 },
818 };
819
820 messages.insert(recipient, (view.u64().into(), serialized_message));
821 }
822
823 let net = Arc::clone(&self.network);
824 let storage = self.storage.clone();
825 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
826 spawn(async move {
827 if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
828 Some(HotShotAction::VidDisperse),
829 storage,
830 consensus,
831 view,
832 epoch,
833 )
834 .await
835 .is_err()
836 {
837 return;
838 }
839 match net.vid_broadcast_message(messages).await {
840 Ok(()) => {},
841 Err(e) => {
842 LogEvent::NetworkSendFailure.record();
843 tracing::debug!("Failed to send message from network task: {e:?}");
844 },
845 }
846 });
847
848 None
849 }
850
851 async fn maybe_record_action(
853 maybe_action: Option<HotShotAction>,
854 storage: S,
855 consensus: OuterConsensus<TYPES>,
856 view: ViewNumber,
857 epoch: Option<EpochNumber>,
858 ) -> std::result::Result<(), ()> {
859 if let Some(mut action) = maybe_action {
860 if !consensus.write().await.update_action(action, view) {
861 return Err(());
862 }
863 if matches!(action, HotShotAction::ViewSyncVote) {
866 action = HotShotAction::Vote;
867 }
868 match storage.record_action(view, epoch, action).await {
869 Ok(()) => Ok(()),
870 Err(e) => {
871 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
872 Err(())
873 },
874 }
875 } else {
876 Ok(())
877 }
878 }
879
880 pub fn cancel_tasks(&mut self, view: ViewNumber) {
882 let keep = self.transmit_tasks.split_off(&view);
883
884 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
885 for task in tasks {
886 task.abort();
887 }
888 }
889
890 self.transmit_tasks = keep;
891 }
892
893 #[allow(clippy::too_many_lines)]
898 async fn parse_event(
899 &mut self,
900 event: Arc<HotShotEvent<TYPES>>,
901 maybe_action: &mut Option<HotShotAction>,
902 ) -> Option<(
903 <TYPES as NodeType>::SignatureKey,
904 MessageKind<TYPES>,
905 TransmitType<TYPES>,
906 )> {
907 match event.as_ref().clone() {
908 HotShotEvent::QuorumProposalSend(proposal, sender) => {
909 *maybe_action = Some(HotShotAction::Propose);
910
911 let message = if self
912 .upgrade_lock
913 .proposal2_version(proposal.data.view_number())
914 {
915 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
916 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
917 ))
918 } else if self
919 .upgrade_lock
920 .proposal2_legacy_version(proposal.data.view_number())
921 {
922 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923 GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
924 ))
925 } else {
926 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
927 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
928 ))
929 };
930
931 Some((sender, message, TransmitType::Broadcast))
932 },
933
934 HotShotEvent::QuorumVoteSend(vote) => {
936 *maybe_action = Some(HotShotAction::Vote);
937 let view_number = vote.view_number() + 1;
938 let leader = match self
939 .membership_coordinator
940 .membership_for_epoch(vote.epoch())
941 .ok()?
942 .leader(view_number)
943 {
944 Ok(l) => l,
945 Err(e) => {
946 tracing::warn!(
947 "Failed to calculate leader for view number {view_number}. Error: \
948 {e:?}"
949 );
950 return None;
951 },
952 };
953
954 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
955 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
956 GeneralConsensusMessage::Vote2(vote.clone()),
957 ))
958 } else {
959 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
960 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
961 ))
962 };
963
964 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
965 },
966 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
967 *maybe_action = Some(HotShotAction::Vote);
968 let view_number = vote.view_number() + 1;
969 let leader = match self
970 .membership_coordinator
971 .membership_for_epoch(vote.epoch())
972 .ok()?
973 .leader(view_number)
974 {
975 Ok(l) => l,
976 Err(e) => {
977 tracing::warn!(
978 "Failed to calculate leader for view number {:?}. Error: {:?}",
979 view_number,
980 e
981 );
982 return None;
983 },
984 };
985
986 let message = if self.upgrade_lock.proposal2_version(vote.view_number()) {
987 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
988 GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
989 ))
990 } else if self
991 .upgrade_lock
992 .proposal2_legacy_version(vote.view_number())
993 {
994 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
995 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
996 ))
997 } else {
998 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
999 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1000 ))
1001 };
1002
1003 Some((
1004 vote.vote.signing_key(),
1005 message,
1006 TransmitType::Direct(leader),
1007 ))
1008 },
1009 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1010 *maybe_action = Some(HotShotAction::Vote);
1011 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1012 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1013 GeneralConsensusMessage::Vote2(vote.clone()),
1014 ))
1015 } else {
1016 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1017 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1018 ))
1019 };
1020
1021 Some((vote.signing_key(), message, TransmitType::Broadcast))
1022 },
1023 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1024 req.key.clone(),
1025 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1026 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1027 )),
1028 TransmitType::Broadcast,
1029 )),
1030 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1031 let message = if self
1032 .upgrade_lock
1033 .proposal2_version(proposal.data.view_number())
1034 {
1035 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1036 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1037 ))
1038 } else if self
1039 .upgrade_lock
1040 .proposal2_legacy_version(proposal.data.view_number())
1041 {
1042 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1043 GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1044 proposal,
1045 )),
1046 ))
1047 } else {
1048 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1049 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1050 ))
1051 };
1052
1053 Some((
1054 sender_key.clone(),
1055 message,
1056 TransmitType::Direct(sender_key),
1057 ))
1058 },
1059 HotShotEvent::VidDisperseSend(proposal, sender) => {
1060 self.handle_vid_disperse_proposal(proposal, &sender).await;
1061 None
1062 },
1063 HotShotEvent::DaProposalSend(proposal, sender) => {
1064 *maybe_action = Some(HotShotAction::DaPropose);
1065
1066 let message = if self
1067 .upgrade_lock
1068 .epochs_enabled(proposal.data.view_number())
1069 {
1070 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1071 DaConsensusMessage::DaProposal2(proposal),
1072 ))
1073 } else {
1074 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1075 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1076 ))
1077 };
1078
1079 Some((sender, message, TransmitType::DaCommitteeBroadcast))
1080 },
1081 HotShotEvent::DaVoteSend(vote) => {
1082 *maybe_action = Some(HotShotAction::DaVote);
1083 let view_number = vote.view_number();
1084 let leader = match self
1085 .membership_coordinator
1086 .membership_for_epoch(vote.epoch())
1087 .ok()?
1088 .leader(view_number)
1089 {
1090 Ok(l) => l,
1091 Err(e) => {
1092 tracing::warn!(
1093 "Failed to calculate leader for view number {view_number}. Error: \
1094 {e:?}"
1095 );
1096 return None;
1097 },
1098 };
1099
1100 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1101 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1102 DaConsensusMessage::DaVote2(vote.clone()),
1103 ))
1104 } else {
1105 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1106 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1107 ))
1108 };
1109
1110 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1111 },
1112 HotShotEvent::DacSend(certificate, sender) => {
1113 *maybe_action = Some(HotShotAction::DaCert);
1114 let message = if self.upgrade_lock.epochs_enabled(certificate.view_number()) {
1115 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1116 DaConsensusMessage::DaCertificate2(certificate),
1117 ))
1118 } else {
1119 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1120 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1121 ))
1122 };
1123
1124 Some((sender, message, TransmitType::Broadcast))
1125 },
1126 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1127 let view_number = vote.view_number() + vote.date().relay;
1128 let leader = match self
1129 .membership_coordinator
1130 .membership_for_epoch(self.epoch)
1131 .ok()?
1132 .leader(view_number)
1133 {
1134 Ok(l) => l,
1135 Err(e) => {
1136 tracing::warn!(
1137 "Failed to calculate leader for view number {view_number}. Error: \
1138 {e:?}"
1139 );
1140 return None;
1141 },
1142 };
1143 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1144 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1145 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1146 ))
1147 } else {
1148 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1149 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1150 ))
1151 };
1152
1153 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1154 },
1155 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1156 *maybe_action = Some(HotShotAction::ViewSyncVote);
1157 let view_number = vote.view_number() + vote.date().relay;
1158 let leader = match self
1159 .membership_coordinator
1160 .membership_for_epoch(self.epoch)
1161 .ok()?
1162 .leader(view_number)
1163 {
1164 Ok(l) => l,
1165 Err(e) => {
1166 tracing::warn!(
1167 "Failed to calculate leader for view number {view_number}. Error: \
1168 {e:?}"
1169 );
1170 return None;
1171 },
1172 };
1173 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1174 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1175 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1176 ))
1177 } else {
1178 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1179 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1180 ))
1181 };
1182
1183 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1184 },
1185 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1186 *maybe_action = Some(HotShotAction::ViewSyncVote);
1187 let view_number = vote.view_number() + vote.date().relay;
1188 let leader = match self
1189 .membership_coordinator
1190 .membership_for_epoch(self.epoch)
1191 .ok()?
1192 .leader(view_number)
1193 {
1194 Ok(l) => l,
1195 Err(e) => {
1196 tracing::warn!(
1197 "Failed to calculate leader for view number {view_number:?}. Error: \
1198 {e:?}"
1199 );
1200 return None;
1201 },
1202 };
1203 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1204 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1205 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1206 ))
1207 } else {
1208 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1209 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1210 ))
1211 };
1212
1213 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1214 },
1215 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1216 let view_number = certificate.view_number();
1217 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1218 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1219 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1220 ))
1221 } else {
1222 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1223 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1224 ))
1225 };
1226
1227 Some((sender, message, TransmitType::Broadcast))
1228 },
1229 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1230 let view_number = certificate.view_number();
1231 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1232 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1234 ))
1235 } else {
1236 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1237 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1238 ))
1239 };
1240
1241 Some((sender, message, TransmitType::Broadcast))
1242 },
1243 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1244 let view_number = certificate.view_number();
1245 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1246 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1248 ))
1249 } else {
1250 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1251 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1252 ))
1253 };
1254
1255 Some((sender, message, TransmitType::Broadcast))
1256 },
1257 HotShotEvent::TimeoutVoteSend(vote) => {
1258 *maybe_action = Some(HotShotAction::TimeoutVote);
1259 let view_number = vote.view_number() + 1;
1260 let leader = match self
1261 .membership_coordinator
1262 .membership_for_epoch(self.epoch)
1263 .ok()?
1264 .leader(view_number)
1265 {
1266 Ok(l) => l,
1267 Err(e) => {
1268 tracing::warn!(
1269 "Failed to calculate leader for view number {view_number}. Error: \
1270 {e:?}"
1271 );
1272 return None;
1273 },
1274 };
1275 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1276 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1277 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1278 ))
1279 } else {
1280 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1281 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1282 ))
1283 };
1284
1285 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1286 },
1287 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1288 sender,
1289 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1290 GeneralConsensusMessage::UpgradeProposal(proposal),
1291 )),
1292 TransmitType::Broadcast,
1293 )),
1294 HotShotEvent::UpgradeVoteSend(vote) => {
1295 tracing::error!("Sending upgrade vote!");
1296 let view_number = vote.view_number();
1297 let leader = match self
1298 .membership_coordinator
1299 .membership_for_epoch(self.epoch)
1300 .ok()?
1301 .leader(view_number)
1302 {
1303 Ok(l) => l,
1304 Err(e) => {
1305 tracing::warn!(
1306 "Failed to calculate leader for view number {view_number}. Error: \
1307 {e:?}"
1308 );
1309 return None;
1310 },
1311 };
1312 Some((
1313 vote.signing_key(),
1314 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1315 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1316 )),
1317 TransmitType::Direct(leader),
1318 ))
1319 },
1320 HotShotEvent::ViewChange(view, epoch) => {
1321 self.view = view;
1322 if epoch > self.epoch {
1323 self.epoch = epoch;
1324 }
1325 let keep_view = ViewNumber::new(view.saturating_sub(1));
1326 self.cancel_tasks(keep_view);
1327 let net = Arc::clone(&self.network);
1328 let epoch = self.epoch.map(|x| x.u64().into());
1329 let membership_coordinator = self.membership_coordinator.clone();
1330 spawn(async move {
1331 net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1332 .await;
1333 });
1334 None
1335 },
1336 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1337 sender,
1338 MessageKind::Data(DataMessage::RequestData(req)),
1339 TransmitType::Direct(to),
1340 )),
1341 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1342 let epochs_enabled = self
1343 .upgrade_lock
1344 .epochs_enabled(proposal.data.view_number());
1345 let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
1346 let message = match proposal.data {
1347 VidDisperseShare::V0(data) => {
1348 if epochs_enabled {
1349 tracing::warn!(
1350 "Epochs are enabled for view {} but still receive \
1351 VidDisperseShare0",
1352 data.view_number()
1353 );
1354 return None;
1355 }
1356 let vid_share_proposal = Proposal {
1357 data,
1358 signature: proposal.signature,
1359 _pd: proposal._pd,
1360 };
1361 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1362 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1363 vid_share_proposal,
1364 )),
1365 )))
1366 },
1367 VidDisperseShare::V1(data) => {
1368 if !epochs_enabled {
1369 tracing::warn!(
1370 "Epochs are enabled for view {} but didn't receive \
1371 VidDisperseShare0",
1372 data.view_number()
1373 );
1374 return None;
1375 }
1376 if upgraded_vid2 {
1377 tracing::warn!(
1378 "VID2 upgrade is enabled for view {} but didn't receive \
1379 VidDisperseShare2",
1380 data.view_number()
1381 );
1382 return None;
1383 }
1384 let vid_share_proposal = Proposal {
1385 data,
1386 signature: proposal.signature,
1387 _pd: proposal._pd,
1388 };
1389 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1391 vid_share_proposal,
1392 )),
1393 )))
1394 },
1395 VidDisperseShare::V2(data) => {
1396 if !upgraded_vid2 {
1397 tracing::warn!(
1398 "VID2 upgrade is not enabled for view {} but receive \
1399 VidDisperseShare2",
1400 data.view_number()
1401 );
1402 return None;
1403 }
1404 let vid_share_proposal = Proposal {
1405 data,
1406 signature: proposal.signature,
1407 _pd: proposal._pd,
1408 };
1409 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411 vid_share_proposal,
1412 )),
1413 )))
1414 },
1415 };
1416 Some((sender, message, TransmitType::Direct(to)))
1417 },
1418 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419 sender,
1420 MessageKind::Consensus(SequencingMessage::General(
1421 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422 )),
1423 TransmitType::Direct(leader),
1424 )),
1425 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426 let message = if self
1427 .upgrade_lock
1428 .proposal2_version(epoch_root_qc.view_number())
1429 {
1430 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1431 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1432 ))
1433 } else {
1434 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1435 GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1436 ))
1437 };
1438
1439 Some((sender, message, TransmitType::Direct(leader)))
1440 },
1441 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1442 sender,
1443 MessageKind::Consensus(SequencingMessage::General(
1444 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1445 )),
1446 TransmitType::Broadcast,
1447 )),
1448 _ => None,
1449 }
1450 }
1451
1452 async fn spawn_transmit_task(
1454 &mut self,
1455 message_kind: MessageKind<TYPES>,
1456 maybe_action: Option<HotShotAction>,
1457 transmit: TransmitType<TYPES>,
1458 sender: TYPES::SignatureKey,
1459 ) {
1460 let broadcast_delay = match &message_kind {
1461 MessageKind::Consensus(
1462 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1463 | SequencingMessage::Da(_),
1464 ) => BroadcastDelay::View(*message_kind.view_number()),
1465 _ => BroadcastDelay::None,
1466 };
1467 let message = Message {
1468 sender,
1469 kind: message_kind,
1470 };
1471 let view_number = message.kind.view_number();
1472 let epoch = message.kind.epoch();
1473 let committee_topic = Topic::Global;
1474 let Ok(mem) = self
1475 .membership_coordinator
1476 .stake_table_for_epoch(self.epoch)
1477 else {
1478 return;
1479 };
1480 let da_committee: BTreeSet<_> = mem.da_committee_members(view_number).cloned().collect();
1481 let network = Arc::clone(&self.network);
1482 let storage = self.storage.clone();
1483 let storage_metrics = Arc::clone(&self.storage_metrics);
1484 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1485 let upgrade_lock = self.upgrade_lock.clone();
1486 let handle = spawn(async move {
1487 if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
1488 maybe_action,
1489 storage.clone(),
1490 consensus,
1491 view_number,
1492 epoch,
1493 )
1494 .await
1495 .is_err()
1496 {
1497 return;
1498 }
1499 if let MessageKind::Consensus(SequencingMessage::General(
1500 GeneralConsensusMessage::Proposal(prop),
1501 )) = &message.kind
1502 {
1503 let now = Instant::now();
1504 if storage
1505 .append_proposal2(&convert_proposal(prop.clone()))
1506 .await
1507 .is_err()
1508 {
1509 return;
1510 }
1511 storage_metrics
1512 .append_quorum_duration
1513 .add_point(now.elapsed().as_secs_f64());
1514 }
1515
1516 let serialized_message = match upgrade_lock.serialize(&message) {
1517 Ok(serialized) => serialized,
1518 Err(e) => {
1519 tracing::error!("Failed to serialize message: {e}");
1520 return;
1521 },
1522 };
1523
1524 let transmit_result = match transmit {
1525 TransmitType::Direct(recipient) => {
1526 network
1527 .direct_message(view_number.u64().into(), serialized_message, recipient)
1528 .await
1529 },
1530 TransmitType::Broadcast => {
1531 network
1532 .broadcast_message(
1533 view_number.u64().into(),
1534 serialized_message,
1535 committee_topic,
1536 broadcast_delay,
1537 )
1538 .await
1539 },
1540 TransmitType::DaCommitteeBroadcast => {
1541 network
1542 .da_broadcast_message(
1543 view_number.u64().into(),
1544 serialized_message,
1545 da_committee.iter().cloned().collect(),
1546 broadcast_delay,
1547 )
1548 .await
1549 },
1550 };
1551
1552 match transmit_result {
1553 Ok(()) => {},
1554 Err(e) => {
1555 LogEvent::NetworkSendFailure.record();
1556 tracing::debug!("Failed to send message task: {e:?}");
1557 },
1558 }
1559 });
1560 self.transmit_tasks
1561 .entry(view_number)
1562 .or_default()
1563 .push(handle);
1564 }
1565}
1566
1567pub mod test {
1569 use std::ops::{Deref, DerefMut};
1570
1571 use async_trait::async_trait;
1572
1573 use super::{
1574 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1575 Receiver, Result, Sender, Storage, TaskState, TransmitType,
1576 };
1577
1578 pub type ModifierClosure<TYPES> = dyn Fn(
1581 &mut <TYPES as NodeType>::SignatureKey,
1582 &mut MessageKind<TYPES>,
1583 &mut TransmitType<TYPES>,
1584 &<TYPES as NodeType>::Membership,
1585 ) + Send
1586 + Sync;
1587
1588 pub struct NetworkEventTaskStateModifier<
1590 TYPES: NodeType,
1591 NET: ConnectedNetwork<TYPES::SignatureKey>,
1592 S: Storage<TYPES>,
1593 > {
1594 pub network_event_task_state: NetworkEventTaskState<TYPES, NET, S>,
1596 pub modifier: Arc<ModifierClosure<TYPES>>,
1599 }
1600
1601 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1602 NetworkEventTaskStateModifier<TYPES, NET, S>
1603 {
1604 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1606 let mut maybe_action = None;
1607 if let Some((mut sender, mut message_kind, mut transmit)) =
1608 self.parse_event(event, &mut maybe_action).await
1609 {
1610 (self.modifier)(
1612 &mut sender,
1613 &mut message_kind,
1614 &mut transmit,
1615 self.membership_coordinator.membership(),
1616 );
1617
1618 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1619 .await;
1620 }
1621 }
1622 }
1623
1624 #[async_trait]
1625 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1626 TaskState for NetworkEventTaskStateModifier<TYPES, NET, S>
1627 {
1628 type Event = HotShotEvent<TYPES>;
1629
1630 async fn handle_event(
1631 &mut self,
1632 event: Arc<Self::Event>,
1633 _sender: &Sender<Arc<Self::Event>>,
1634 _receiver: &Receiver<Arc<Self::Event>>,
1635 ) -> Result<()> {
1636 self.handle(event).await;
1637
1638 Ok(())
1639 }
1640
1641 fn cancel_subtasks(&mut self) {}
1642 }
1643
1644 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> Deref
1645 for NetworkEventTaskStateModifier<TYPES, NET, S>
1646 {
1647 type Target = NetworkEventTaskState<TYPES, NET, S>;
1648
1649 fn deref(&self) -> &Self::Target {
1650 &self.network_event_task_state
1651 }
1652 }
1653
1654 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> DerefMut
1655 for NetworkEventTaskStateModifier<TYPES, NET, S>
1656 {
1657 fn deref_mut(&mut self) -> &mut Self::Target {
1658 &mut self.network_event_task_state
1659 }
1660 }
1661}