Skip to main content

hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, BTreeSet, HashMap},
9    hash::{DefaultHasher, Hash},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_libp2p_networking::network::log_summary::LogEvent;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{EpochNumber, VidDisperse, VidDisperseShare, ViewNumber},
20    epoch_membership::EpochMembershipCoordinator,
21    event::{Event, EventType, HotShotAction},
22    message::{
23        DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal,
24        SequencingMessage, UpgradeLock, convert_proposal,
25    },
26    simple_vote::HasEpoch,
27    storage_metrics::StorageMetricsValue,
28    traits::{
29        network::{
30            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
31            ViewMessage,
32        },
33        node_implementation::NodeType,
34        storage::Storage,
35    },
36    vote::{HasViewNumber, Vote},
37};
38use hotshot_utils::anytrace::*;
39use tokio::{spawn, task::JoinHandle, time::Instant};
40use tracing::instrument;
41
42use crate::{
43    events::{HotShotEvent, HotShotTaskCompleted},
44    helpers::broadcast_event,
45};
46
47/// the network message task state
48#[derive(Clone)]
49pub struct NetworkMessageTaskState<TYPES: NodeType> {
50    /// Sender to send internal events this task generates to other tasks
51    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
52
53    /// Sender to send external events this task generates to the event stream
54    pub external_event_stream: Sender<Event<TYPES>>,
55
56    /// This nodes public key
57    pub public_key: TYPES::SignatureKey,
58
59    /// Lock for a decided upgrade
60    pub upgrade_lock: UpgradeLock<TYPES>,
61
62    /// Node's id
63    pub id: u64,
64}
65
66impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
67    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
68    /// Handles a (deserialized) message from the network
69    pub async fn handle_message(&mut self, message: Message<TYPES>) {
70        match &message.kind {
71            MessageKind::Consensus(_) => {
72                tracing::debug!("Received consensus message from network: {:?}", message)
73            },
74            MessageKind::Data(_) => {
75                tracing::trace!("Received data message from network: {:?}", message)
76            },
77            MessageKind::External(_) => {
78                tracing::trace!("Received external message from network: {:?}", message)
79            },
80        }
81
82        // Match the message kind and send the appropriate event to the internal event stream
83        let sender = message.sender;
84        match message.kind {
85            // Handle consensus messages
86            MessageKind::Consensus(consensus_message) => {
87                let event = match consensus_message {
88                    SequencingMessage::General(general_message) => match general_message {
89                        GeneralConsensusMessage::Proposal(proposal) => {
90                            if self
91                                .upgrade_lock
92                                .epochs_enabled(proposal.data.view_number())
93                            {
94                                tracing::warn!(
95                                    "received GeneralConsensusMessage::Proposal for view {} but \
96                                     epochs are enabled for that view",
97                                    proposal.data.view_number()
98                                );
99                                return;
100                            }
101                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102                        },
103                        GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104                            if !self
105                                .upgrade_lock
106                                .proposal2_legacy_version(proposal.data.view_number())
107                            {
108                                tracing::warn!(
109                                    "received GeneralConsensusMessage::Proposal2Legacy for view \
110                                     {} but we are in the wrong version for that message type",
111                                    proposal.data.view_number()
112                                );
113                                return;
114                            }
115                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
116                        },
117                        GeneralConsensusMessage::Proposal2(proposal) => {
118                            if !self
119                                .upgrade_lock
120                                .proposal2_version(proposal.data.view_number())
121                            {
122                                tracing::warn!(
123                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
124                                     we are in the wrong version for that message type",
125                                    proposal.data.view_number()
126                                );
127                                return;
128                            }
129                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
130                        },
131                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
132                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
133                        },
134                        GeneralConsensusMessage::ProposalResponse(proposal) => {
135                            if self
136                                .upgrade_lock
137                                .epochs_enabled(proposal.data.view_number())
138                            {
139                                tracing::warn!(
140                                    "received GeneralConsensusMessage::ProposalResponse for view \
141                                     {} but we are in the wrong version for that message type",
142                                    proposal.data.view_number()
143                                );
144                                return;
145                            }
146                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
147                        },
148                        GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
149                            if !self
150                                .upgrade_lock
151                                .proposal2_legacy_version(proposal.data.view_number())
152                            {
153                                tracing::warn!(
154                                    "received GeneralConsensusMessage::ProposalResponse2Legacy \
155                                     for view {} but we are in the wrong version for that message \
156                                     type",
157                                    proposal.data.view_number()
158                                );
159                                return;
160                            }
161                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
162                        },
163                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
164                            if !self
165                                .upgrade_lock
166                                .proposal2_version(proposal.data.view_number())
167                            {
168                                tracing::warn!(
169                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
170                                     {} but epochs are not enabled for that view",
171                                    proposal.data.view_number()
172                                );
173                                return;
174                            }
175                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
176                        },
177                        GeneralConsensusMessage::Vote(vote) => {
178                            if self.upgrade_lock.epochs_enabled(vote.view_number()) {
179                                tracing::warn!(
180                                    "received GeneralConsensusMessage::Vote for view {} but \
181                                     epochs are enabled for that view",
182                                    vote.view_number()
183                                );
184                                return;
185                            }
186                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
187                        },
188                        GeneralConsensusMessage::Vote2(vote) => {
189                            if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
190                                tracing::warn!(
191                                    "received GeneralConsensusMessage::Vote2 for view {} but \
192                                     epochs are not enabled for that view",
193                                    vote.view_number()
194                                );
195                                return;
196                            }
197                            HotShotEvent::QuorumVoteRecv(vote)
198                        },
199                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
200                            if self
201                                .upgrade_lock
202                                .epochs_enabled(view_sync_message.view_number())
203                            {
204                                tracing::warn!(
205                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
206                                     view {} but epochs are enabled for that view",
207                                    view_sync_message.view_number()
208                                );
209                                return;
210                            }
211                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
212                        },
213                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
214                            if !self
215                                .upgrade_lock
216                                .epochs_enabled(view_sync_message.view_number())
217                            {
218                                tracing::warn!(
219                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
220                                     view {} but epochs are not enabled for that view",
221                                    view_sync_message.view_number()
222                                );
223                                return;
224                            }
225                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
226                        },
227                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
228                            view_sync_message,
229                        ) => {
230                            if self
231                                .upgrade_lock
232                                .epochs_enabled(view_sync_message.view_number())
233                            {
234                                tracing::warn!(
235                                    "received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view",
236                                    view_sync_message.view_number()
237                                );
238                                return;
239                            }
240                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
241                                view_sync_message.to_vsc2(),
242                            )
243                        },
244                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
245                            view_sync_message,
246                        ) => {
247                            if !self
248                                .upgrade_lock
249                                .epochs_enabled(view_sync_message.view_number())
250                            {
251                                tracing::warn!(
252                                    "received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view",
253                                    view_sync_message.view_number()
254                                );
255                                return;
256                            }
257                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
258                        },
259                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
260                            if self
261                                .upgrade_lock
262                                .epochs_enabled(view_sync_message.view_number())
263                            {
264                                tracing::warn!(
265                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
266                                     view {} but epochs are enabled for that view",
267                                    view_sync_message.view_number()
268                                );
269                                return;
270                            }
271                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
272                        },
273                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
274                            if !self
275                                .upgrade_lock
276                                .epochs_enabled(view_sync_message.view_number())
277                            {
278                                tracing::warn!(
279                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
280                                     view {} but epochs are not enabled for that view",
281                                    view_sync_message.view_number()
282                                );
283                                return;
284                            }
285                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
286                        },
287                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
288                            if self
289                                .upgrade_lock
290                                .epochs_enabled(view_sync_message.view_number())
291                            {
292                                tracing::warn!(
293                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
294                                     for view {} but epochs are enabled for that view",
295                                    view_sync_message.view_number()
296                                );
297                                return;
298                            }
299                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
300                        },
301                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
302                            if !self
303                                .upgrade_lock
304                                .epochs_enabled(view_sync_message.view_number())
305                            {
306                                tracing::warn!(
307                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
308                                     for view {} but epochs are not enabled for that view",
309                                    view_sync_message.view_number()
310                                );
311                                return;
312                            }
313                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
314                        },
315                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
316                            if self
317                                .upgrade_lock
318                                .epochs_enabled(view_sync_message.view_number())
319                            {
320                                tracing::warn!(
321                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
322                                     view {} but epochs are enabled for that view",
323                                    view_sync_message.view_number()
324                                );
325                                return;
326                            }
327                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
328                        },
329                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
330                            if !self
331                                .upgrade_lock
332                                .epochs_enabled(view_sync_message.view_number())
333                            {
334                                tracing::warn!(
335                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
336                                     view {} but epochs are not enabled for that view",
337                                    view_sync_message.view_number()
338                                );
339                                return;
340                            }
341                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
342                        },
343                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
344                            if self
345                                .upgrade_lock
346                                .epochs_enabled(view_sync_message.view_number())
347                            {
348                                tracing::warn!(
349                                    "received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view",
350                                    view_sync_message.view_number()
351                                );
352                                return;
353                            }
354                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
355                                view_sync_message.to_vsc2(),
356                            )
357                        },
358                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
359                            view_sync_message,
360                        ) => {
361                            if !self
362                                .upgrade_lock
363                                .epochs_enabled(view_sync_message.view_number())
364                            {
365                                tracing::warn!(
366                                    "received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view",
367                                    view_sync_message.view_number()
368                                );
369                                return;
370                            }
371                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
372                        },
373                        GeneralConsensusMessage::TimeoutVote(message) => {
374                            if self.upgrade_lock.epochs_enabled(message.view_number()) {
375                                tracing::warn!(
376                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
377                                     but epochs are enabled for that view",
378                                    message.view_number()
379                                );
380                                return;
381                            }
382                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
383                        },
384                        GeneralConsensusMessage::TimeoutVote2(message) => {
385                            if !self.upgrade_lock.epochs_enabled(message.view_number()) {
386                                tracing::warn!(
387                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
388                                     but epochs are not enabled for that view",
389                                    message.view_number()
390                                );
391                                return;
392                            }
393                            HotShotEvent::TimeoutVoteRecv(message)
394                        },
395                        GeneralConsensusMessage::UpgradeProposal(message) => {
396                            HotShotEvent::UpgradeProposalRecv(message, sender)
397                        },
398                        GeneralConsensusMessage::UpgradeVote(message) => {
399                            tracing::error!("Received upgrade vote!");
400                            HotShotEvent::UpgradeVoteRecv(message)
401                        },
402                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
403                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
404                        },
405                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
406                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
407                        },
408                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
409                            if !self
410                                .upgrade_lock
411                                .proposal2_legacy_version(vote.view_number())
412                            {
413                                tracing::warn!(
414                                    "received GeneralConsensusMessage::EpochRootQuorumVote for \
415                                     view {} but we do not expect this message in this version",
416                                    vote.view_number()
417                                );
418                                return;
419                            }
420                            HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
421                        },
422                        GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
423                            if !self.upgrade_lock.proposal2_version(vote.view_number()) {
424                                tracing::warn!(
425                                    "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
426                                     view {} but we do not expect this message in this version",
427                                    vote.view_number()
428                                );
429                                return;
430                            }
431                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
432                        },
433                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
434                            if !self.upgrade_lock.proposal2_version(root_qc.view_number()) {
435                                tracing::warn!(
436                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
437                                     but we are in the wrong version for that message types",
438                                    root_qc.view_number()
439                                );
440                                return;
441                            }
442                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
443                        },
444                        GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
445                            if !self
446                                .upgrade_lock
447                                .proposal2_legacy_version(root_qc.view_number())
448                            {
449                                tracing::warn!(
450                                    "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
451                                     but we are in the wrong version for that message type",
452                                    root_qc.view_number()
453                                );
454                                return;
455                            }
456                            HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
457                        },
458                    },
459                    SequencingMessage::Da(da_message) => match da_message {
460                        DaConsensusMessage::DaProposal(proposal) => {
461                            if self
462                                .upgrade_lock
463                                .epochs_enabled(proposal.data.view_number())
464                            {
465                                tracing::warn!(
466                                    "received DaConsensusMessage::DaProposal for view {} but \
467                                     epochs are enabled for that view",
468                                    proposal.data.view_number()
469                                );
470                                return;
471                            }
472                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
473                        },
474                        DaConsensusMessage::DaProposal2(proposal) => {
475                            if !self
476                                .upgrade_lock
477                                .epochs_enabled(proposal.data.view_number())
478                            {
479                                tracing::warn!(
480                                    "received DaConsensusMessage::DaProposal2 for view {} but \
481                                     epochs are not enabled for that view",
482                                    proposal.data.view_number()
483                                );
484                                return;
485                            }
486                            HotShotEvent::DaProposalRecv(proposal, sender)
487                        },
488                        DaConsensusMessage::DaVote(vote) => {
489                            if self.upgrade_lock.epochs_enabled(vote.view_number()) {
490                                tracing::warn!(
491                                    "received DaConsensusMessage::DaVote for view {} but epochs \
492                                     are enabled for that view",
493                                    vote.view_number()
494                                );
495                                return;
496                            }
497                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
498                        },
499                        DaConsensusMessage::DaVote2(vote) => {
500                            if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
501                                tracing::warn!(
502                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
503                                     are not enabled for that view",
504                                    vote.view_number()
505                                );
506                                return;
507                            }
508                            HotShotEvent::DaVoteRecv(vote.clone())
509                        },
510                        DaConsensusMessage::DaCertificate(cert) => {
511                            if self.upgrade_lock.epochs_enabled(cert.view_number()) {
512                                tracing::warn!(
513                                    "received DaConsensusMessage::DaCertificate for view {} but \
514                                     epochs are enabled for that view",
515                                    cert.view_number()
516                                );
517                                return;
518                            }
519                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
520                        },
521                        DaConsensusMessage::DaCertificate2(cert) => {
522                            if !self.upgrade_lock.epochs_enabled(cert.view_number()) {
523                                tracing::warn!(
524                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
525                                     epochs are not enabled for that view",
526                                    cert.view_number()
527                                );
528                                return;
529                            }
530                            HotShotEvent::DaCertificateRecv(cert)
531                        },
532                        DaConsensusMessage::VidDisperseMsg(proposal) => {
533                            if self
534                                .upgrade_lock
535                                .epochs_enabled(proposal.data.view_number())
536                            {
537                                tracing::warn!(
538                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
539                                     epochs are enabled for that view",
540                                    proposal.data.view_number()
541                                );
542                                return;
543                            }
544                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
545                        },
546                        DaConsensusMessage::VidDisperseMsg1(proposal) => {
547                            if !self
548                                .upgrade_lock
549                                .epochs_enabled(proposal.data.view_number())
550                            {
551                                tracing::warn!(
552                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
553                                     epochs are not enabled for that view",
554                                    proposal.data.view_number()
555                                );
556                                return;
557                            }
558                            if self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
559                                tracing::warn!(
560                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
561                                     vid2 upgrade is enabled for that view",
562                                    proposal.data.view_number()
563                                );
564                                return;
565                            }
566                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
567                        },
568                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
569                            if !self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
570                                tracing::warn!(
571                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
572                                     vid2 upgrade is not enabled for that view",
573                                    proposal.data.view_number()
574                                );
575                                return;
576                            }
577                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
578                        },
579                    },
580                };
581                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
582            },
583
584            // Handle data messages
585            MessageKind::Data(message) => match message {
586                DataMessage::SubmitTransaction(transaction, _) => {
587                    let mut hasher = DefaultHasher::new();
588                    transaction.hash(&mut hasher);
589                    broadcast_event(
590                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
591                        &self.internal_event_stream,
592                    )
593                    .await;
594                },
595                DataMessage::DataResponse(response) => {
596                    if let ResponseMessage::Found(message) = response {
597                        match message {
598                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
599                                broadcast_event(
600                                    Arc::new(HotShotEvent::VidResponseRecv(
601                                        sender,
602                                        convert_proposal(proposal),
603                                    )),
604                                    &self.internal_event_stream,
605                                )
606                                .await;
607                            },
608                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
609                                proposal,
610                            )) => {
611                                broadcast_event(
612                                    Arc::new(HotShotEvent::VidResponseRecv(
613                                        sender,
614                                        convert_proposal(proposal),
615                                    )),
616                                    &self.internal_event_stream,
617                                )
618                                .await;
619                            },
620                            _ => {},
621                        }
622                    }
623                },
624                DataMessage::RequestData(data) => {
625                    let req_data = data.clone();
626                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
627                        broadcast_event(
628                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
629                            &self.internal_event_stream,
630                        )
631                        .await;
632                    }
633                },
634            },
635
636            // Handle external messages
637            MessageKind::External(data) => {
638                if sender == self.public_key {
639                    return;
640                }
641                // Send the external message to the external event stream so it can be processed
642                broadcast_event(
643                    Event {
644                        view_number: ViewNumber::new(1),
645                        event: EventType::ExternalMessageReceived { sender, data },
646                    },
647                    &self.external_event_stream,
648                )
649                .await;
650            },
651        }
652    }
653}
654
655/// network event task state
656pub struct NetworkEventTaskState<
657    TYPES: NodeType,
658    NET: ConnectedNetwork<TYPES::SignatureKey>,
659    S: Storage<TYPES>,
660> {
661    /// comm network
662    pub network: Arc<NET>,
663
664    /// view number
665    pub view: ViewNumber,
666
667    /// epoch number
668    pub epoch: Option<EpochNumber>,
669
670    /// network memberships
671    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
672
673    /// Storage to store actionable events
674    pub storage: S,
675
676    /// Storage metrics
677    pub storage_metrics: Arc<StorageMetricsValue>,
678
679    /// Shared consensus state
680    pub consensus: OuterConsensus<TYPES>,
681
682    /// Lock for a decided upgrade
683    pub upgrade_lock: UpgradeLock<TYPES>,
684
685    /// map view number to transmit tasks
686    pub transmit_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
687
688    /// Number of blocks in an epoch, zero means there are no epochs
689    pub epoch_height: u64,
690
691    /// Node's id
692    pub id: u64,
693}
694
695#[async_trait]
696impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
697    TaskState for NetworkEventTaskState<TYPES, NET, S>
698{
699    type Event = HotShotEvent<TYPES>;
700
701    async fn handle_event(
702        &mut self,
703        event: Arc<Self::Event>,
704        _sender: &Sender<Arc<Self::Event>>,
705        _receiver: &Receiver<Arc<Self::Event>>,
706    ) -> Result<()> {
707        self.handle(event).await;
708
709        Ok(())
710    }
711
712    fn cancel_subtasks(&mut self) {}
713}
714
715impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
716    NetworkEventTaskState<TYPES, NET, S>
717{
718    /// Handle the given event.
719    ///
720    /// Returns the completion status.
721    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
722    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
723        let mut maybe_action = None;
724        if let Some((sender, message_kind, transmit)) =
725            self.parse_event(event, &mut maybe_action).await
726        {
727            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
728                .await;
729        };
730    }
731
732    /// handle `VidDisperseSend`
733    async fn handle_vid_disperse_proposal(
734        &self,
735        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
736        sender: &<TYPES as NodeType>::SignatureKey,
737    ) -> Option<HotShotTaskCompleted> {
738        let view = vid_proposal.data.view_number();
739        let epoch = vid_proposal.data.epoch();
740        let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
741        let mut messages = HashMap::new();
742
743        for proposal in vid_share_proposals {
744            let recipient = proposal.data.recipient_key().clone();
745            let epochs_enabled = self
746                .upgrade_lock
747                .epochs_enabled(proposal.data.view_number());
748            let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
749            let message = if !epochs_enabled {
750                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
751                    Proposal {
752                        data,
753                        signature: proposal.signature,
754                        _pd: proposal._pd,
755                    }
756                } else {
757                    tracing::warn!(
758                        "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
759                        proposal.data.view_number()
760                    );
761                    return None;
762                };
763                Message {
764                    sender: sender.clone(),
765                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
766                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
767                    )),
768                }
769            } else if !upgraded_vid2 {
770                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
771                    Proposal {
772                        data,
773                        signature: proposal.signature,
774                        _pd: proposal._pd,
775                    }
776                } else {
777                    tracing::warn!(
778                        "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
779                         receive VidDisperseShare2",
780                        proposal.data.view_number()
781                    );
782                    return None;
783                };
784                Message {
785                    sender: sender.clone(),
786                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
787                        DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
788                    )),
789                }
790            } else {
791                let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
792                    Proposal {
793                        data,
794                        signature: proposal.signature,
795                        _pd: proposal._pd,
796                    }
797                } else {
798                    tracing::warn!(
799                        "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
800                        proposal.data.view_number()
801                    );
802                    return None;
803                };
804                Message {
805                    sender: sender.clone(),
806                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
807                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
808                    )),
809                }
810            };
811            let view = message.view_number();
812            let serialized_message = match self.upgrade_lock.serialize(&message) {
813                Ok(serialized) => serialized,
814                Err(e) => {
815                    tracing::error!("Failed to serialize message: {e}");
816                    continue;
817                },
818            };
819
820            messages.insert(recipient, (view.u64().into(), serialized_message));
821        }
822
823        let net = Arc::clone(&self.network);
824        let storage = self.storage.clone();
825        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
826        spawn(async move {
827            if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
828                Some(HotShotAction::VidDisperse),
829                storage,
830                consensus,
831                view,
832                epoch,
833            )
834            .await
835            .is_err()
836            {
837                return;
838            }
839            match net.vid_broadcast_message(messages).await {
840                Ok(()) => {},
841                Err(e) => {
842                    LogEvent::NetworkSendFailure.record();
843                    tracing::debug!("Failed to send message from network task: {e:?}");
844                },
845            }
846        });
847
848        None
849    }
850
851    /// Record `HotShotAction` if available
852    async fn maybe_record_action(
853        maybe_action: Option<HotShotAction>,
854        storage: S,
855        consensus: OuterConsensus<TYPES>,
856        view: ViewNumber,
857        epoch: Option<EpochNumber>,
858    ) -> std::result::Result<(), ()> {
859        if let Some(mut action) = maybe_action {
860            if !consensus.write().await.update_action(action, view) {
861                return Err(());
862            }
863            // If the action was view sync record it as a vote, but we don't
864            // want to limit to 1 View sync vote above so change the action here.
865            if matches!(action, HotShotAction::ViewSyncVote) {
866                action = HotShotAction::Vote;
867            }
868            match storage.record_action(view, epoch, action).await {
869                Ok(()) => Ok(()),
870                Err(e) => {
871                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
872                    Err(())
873                },
874            }
875        } else {
876            Ok(())
877        }
878    }
879
880    /// Cancel all tasks for previous views
881    pub fn cancel_tasks(&mut self, view: ViewNumber) {
882        let keep = self.transmit_tasks.split_off(&view);
883
884        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
885            for task in tasks {
886                task.abort();
887            }
888        }
889
890        self.transmit_tasks = keep;
891    }
892
893    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
894    /// which will be used to create a message and transmit on the wire.
895    /// Returns `None` if the parsing result should not be sent on the wire.
896    /// Handles the `VidDisperseSend` event separately using a helper method.
897    #[allow(clippy::too_many_lines)]
898    async fn parse_event(
899        &mut self,
900        event: Arc<HotShotEvent<TYPES>>,
901        maybe_action: &mut Option<HotShotAction>,
902    ) -> Option<(
903        <TYPES as NodeType>::SignatureKey,
904        MessageKind<TYPES>,
905        TransmitType<TYPES>,
906    )> {
907        match event.as_ref().clone() {
908            HotShotEvent::QuorumProposalSend(proposal, sender) => {
909                *maybe_action = Some(HotShotAction::Propose);
910
911                let message = if self
912                    .upgrade_lock
913                    .proposal2_version(proposal.data.view_number())
914                {
915                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
916                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
917                    ))
918                } else if self
919                    .upgrade_lock
920                    .proposal2_legacy_version(proposal.data.view_number())
921                {
922                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923                        GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
924                    ))
925                } else {
926                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
927                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
928                    ))
929                };
930
931                Some((sender, message, TransmitType::Broadcast))
932            },
933
934            // ED Each network task is subscribed to all these message types.  Need filters per network task
935            HotShotEvent::QuorumVoteSend(vote) => {
936                *maybe_action = Some(HotShotAction::Vote);
937                let view_number = vote.view_number() + 1;
938                let leader = match self
939                    .membership_coordinator
940                    .membership_for_epoch(vote.epoch())
941                    .ok()?
942                    .leader(view_number)
943                {
944                    Ok(l) => l,
945                    Err(e) => {
946                        tracing::warn!(
947                            "Failed to calculate leader for view number {view_number}. Error: \
948                             {e:?}"
949                        );
950                        return None;
951                    },
952                };
953
954                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
955                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
956                        GeneralConsensusMessage::Vote2(vote.clone()),
957                    ))
958                } else {
959                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
960                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
961                    ))
962                };
963
964                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
965            },
966            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
967                *maybe_action = Some(HotShotAction::Vote);
968                let view_number = vote.view_number() + 1;
969                let leader = match self
970                    .membership_coordinator
971                    .membership_for_epoch(vote.epoch())
972                    .ok()?
973                    .leader(view_number)
974                {
975                    Ok(l) => l,
976                    Err(e) => {
977                        tracing::warn!(
978                            "Failed to calculate leader for view number {:?}. Error: {:?}",
979                            view_number,
980                            e
981                        );
982                        return None;
983                    },
984                };
985
986                let message = if self.upgrade_lock.proposal2_version(vote.view_number()) {
987                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
988                        GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
989                    ))
990                } else if self
991                    .upgrade_lock
992                    .proposal2_legacy_version(vote.view_number())
993                {
994                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
995                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
996                    ))
997                } else {
998                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
999                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1000                    ))
1001                };
1002
1003                Some((
1004                    vote.vote.signing_key(),
1005                    message,
1006                    TransmitType::Direct(leader),
1007                ))
1008            },
1009            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1010                *maybe_action = Some(HotShotAction::Vote);
1011                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1012                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1013                        GeneralConsensusMessage::Vote2(vote.clone()),
1014                    ))
1015                } else {
1016                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1017                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1018                    ))
1019                };
1020
1021                Some((vote.signing_key(), message, TransmitType::Broadcast))
1022            },
1023            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1024                req.key.clone(),
1025                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1026                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1027                )),
1028                TransmitType::Broadcast,
1029            )),
1030            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1031                let message = if self
1032                    .upgrade_lock
1033                    .proposal2_version(proposal.data.view_number())
1034                {
1035                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1036                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1037                    ))
1038                } else if self
1039                    .upgrade_lock
1040                    .proposal2_legacy_version(proposal.data.view_number())
1041                {
1042                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1043                        GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1044                            proposal,
1045                        )),
1046                    ))
1047                } else {
1048                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1049                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1050                    ))
1051                };
1052
1053                Some((
1054                    sender_key.clone(),
1055                    message,
1056                    TransmitType::Direct(sender_key),
1057                ))
1058            },
1059            HotShotEvent::VidDisperseSend(proposal, sender) => {
1060                self.handle_vid_disperse_proposal(proposal, &sender).await;
1061                None
1062            },
1063            HotShotEvent::DaProposalSend(proposal, sender) => {
1064                *maybe_action = Some(HotShotAction::DaPropose);
1065
1066                let message = if self
1067                    .upgrade_lock
1068                    .epochs_enabled(proposal.data.view_number())
1069                {
1070                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1071                        DaConsensusMessage::DaProposal2(proposal),
1072                    ))
1073                } else {
1074                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1075                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1076                    ))
1077                };
1078
1079                Some((sender, message, TransmitType::DaCommitteeBroadcast))
1080            },
1081            HotShotEvent::DaVoteSend(vote) => {
1082                *maybe_action = Some(HotShotAction::DaVote);
1083                let view_number = vote.view_number();
1084                let leader = match self
1085                    .membership_coordinator
1086                    .membership_for_epoch(vote.epoch())
1087                    .ok()?
1088                    .leader(view_number)
1089                {
1090                    Ok(l) => l,
1091                    Err(e) => {
1092                        tracing::warn!(
1093                            "Failed to calculate leader for view number {view_number}. Error: \
1094                             {e:?}"
1095                        );
1096                        return None;
1097                    },
1098                };
1099
1100                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1101                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1102                        DaConsensusMessage::DaVote2(vote.clone()),
1103                    ))
1104                } else {
1105                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1106                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1107                    ))
1108                };
1109
1110                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1111            },
1112            HotShotEvent::DacSend(certificate, sender) => {
1113                *maybe_action = Some(HotShotAction::DaCert);
1114                let message = if self.upgrade_lock.epochs_enabled(certificate.view_number()) {
1115                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1116                        DaConsensusMessage::DaCertificate2(certificate),
1117                    ))
1118                } else {
1119                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1120                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1121                    ))
1122                };
1123
1124                Some((sender, message, TransmitType::Broadcast))
1125            },
1126            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1127                let view_number = vote.view_number() + vote.date().relay;
1128                let leader = match self
1129                    .membership_coordinator
1130                    .membership_for_epoch(self.epoch)
1131                    .ok()?
1132                    .leader(view_number)
1133                {
1134                    Ok(l) => l,
1135                    Err(e) => {
1136                        tracing::warn!(
1137                            "Failed to calculate leader for view number {view_number}. Error: \
1138                             {e:?}"
1139                        );
1140                        return None;
1141                    },
1142                };
1143                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1144                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1145                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1146                    ))
1147                } else {
1148                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1149                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1150                    ))
1151                };
1152
1153                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1154            },
1155            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1156                *maybe_action = Some(HotShotAction::ViewSyncVote);
1157                let view_number = vote.view_number() + vote.date().relay;
1158                let leader = match self
1159                    .membership_coordinator
1160                    .membership_for_epoch(self.epoch)
1161                    .ok()?
1162                    .leader(view_number)
1163                {
1164                    Ok(l) => l,
1165                    Err(e) => {
1166                        tracing::warn!(
1167                            "Failed to calculate leader for view number {view_number}. Error: \
1168                             {e:?}"
1169                        );
1170                        return None;
1171                    },
1172                };
1173                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1174                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1175                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1176                    ))
1177                } else {
1178                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1179                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1180                    ))
1181                };
1182
1183                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1184            },
1185            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1186                *maybe_action = Some(HotShotAction::ViewSyncVote);
1187                let view_number = vote.view_number() + vote.date().relay;
1188                let leader = match self
1189                    .membership_coordinator
1190                    .membership_for_epoch(self.epoch)
1191                    .ok()?
1192                    .leader(view_number)
1193                {
1194                    Ok(l) => l,
1195                    Err(e) => {
1196                        tracing::warn!(
1197                            "Failed to calculate leader for view number {view_number:?}. Error: \
1198                             {e:?}"
1199                        );
1200                        return None;
1201                    },
1202                };
1203                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1204                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1205                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1206                    ))
1207                } else {
1208                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1209                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1210                    ))
1211                };
1212
1213                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1214            },
1215            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1216                let view_number = certificate.view_number();
1217                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1218                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1219                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1220                    ))
1221                } else {
1222                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1223                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1224                    ))
1225                };
1226
1227                Some((sender, message, TransmitType::Broadcast))
1228            },
1229            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1230                let view_number = certificate.view_number();
1231                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1232                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1234                    ))
1235                } else {
1236                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1237                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1238                    ))
1239                };
1240
1241                Some((sender, message, TransmitType::Broadcast))
1242            },
1243            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1244                let view_number = certificate.view_number();
1245                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1246                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1248                    ))
1249                } else {
1250                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1251                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1252                    ))
1253                };
1254
1255                Some((sender, message, TransmitType::Broadcast))
1256            },
1257            HotShotEvent::TimeoutVoteSend(vote) => {
1258                *maybe_action = Some(HotShotAction::TimeoutVote);
1259                let view_number = vote.view_number() + 1;
1260                let leader = match self
1261                    .membership_coordinator
1262                    .membership_for_epoch(self.epoch)
1263                    .ok()?
1264                    .leader(view_number)
1265                {
1266                    Ok(l) => l,
1267                    Err(e) => {
1268                        tracing::warn!(
1269                            "Failed to calculate leader for view number {view_number}. Error: \
1270                             {e:?}"
1271                        );
1272                        return None;
1273                    },
1274                };
1275                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1276                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1277                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1278                    ))
1279                } else {
1280                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1281                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1282                    ))
1283                };
1284
1285                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1286            },
1287            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1288                sender,
1289                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1290                    GeneralConsensusMessage::UpgradeProposal(proposal),
1291                )),
1292                TransmitType::Broadcast,
1293            )),
1294            HotShotEvent::UpgradeVoteSend(vote) => {
1295                tracing::error!("Sending upgrade vote!");
1296                let view_number = vote.view_number();
1297                let leader = match self
1298                    .membership_coordinator
1299                    .membership_for_epoch(self.epoch)
1300                    .ok()?
1301                    .leader(view_number)
1302                {
1303                    Ok(l) => l,
1304                    Err(e) => {
1305                        tracing::warn!(
1306                            "Failed to calculate leader for view number {view_number}. Error: \
1307                             {e:?}"
1308                        );
1309                        return None;
1310                    },
1311                };
1312                Some((
1313                    vote.signing_key(),
1314                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1315                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1316                    )),
1317                    TransmitType::Direct(leader),
1318                ))
1319            },
1320            HotShotEvent::ViewChange(view, epoch) => {
1321                self.view = view;
1322                if epoch > self.epoch {
1323                    self.epoch = epoch;
1324                }
1325                let keep_view = ViewNumber::new(view.saturating_sub(1));
1326                self.cancel_tasks(keep_view);
1327                let net = Arc::clone(&self.network);
1328                let epoch = self.epoch.map(|x| x.u64().into());
1329                let membership_coordinator = self.membership_coordinator.clone();
1330                spawn(async move {
1331                    net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1332                        .await;
1333                });
1334                None
1335            },
1336            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1337                sender,
1338                MessageKind::Data(DataMessage::RequestData(req)),
1339                TransmitType::Direct(to),
1340            )),
1341            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1342                let epochs_enabled = self
1343                    .upgrade_lock
1344                    .epochs_enabled(proposal.data.view_number());
1345                let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
1346                let message = match proposal.data {
1347                    VidDisperseShare::V0(data) => {
1348                        if epochs_enabled {
1349                            tracing::warn!(
1350                                "Epochs are enabled for view {} but still receive \
1351                                 VidDisperseShare0",
1352                                data.view_number()
1353                            );
1354                            return None;
1355                        }
1356                        let vid_share_proposal = Proposal {
1357                            data,
1358                            signature: proposal.signature,
1359                            _pd: proposal._pd,
1360                        };
1361                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1362                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1363                                vid_share_proposal,
1364                            )),
1365                        )))
1366                    },
1367                    VidDisperseShare::V1(data) => {
1368                        if !epochs_enabled {
1369                            tracing::warn!(
1370                                "Epochs are enabled for view {} but didn't receive \
1371                                 VidDisperseShare0",
1372                                data.view_number()
1373                            );
1374                            return None;
1375                        }
1376                        if upgraded_vid2 {
1377                            tracing::warn!(
1378                                "VID2 upgrade is enabled for view {} but didn't receive \
1379                                 VidDisperseShare2",
1380                                data.view_number()
1381                            );
1382                            return None;
1383                        }
1384                        let vid_share_proposal = Proposal {
1385                            data,
1386                            signature: proposal.signature,
1387                            _pd: proposal._pd,
1388                        };
1389                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1391                                vid_share_proposal,
1392                            )),
1393                        )))
1394                    },
1395                    VidDisperseShare::V2(data) => {
1396                        if !upgraded_vid2 {
1397                            tracing::warn!(
1398                                "VID2 upgrade is not enabled for view {} but receive \
1399                                 VidDisperseShare2",
1400                                data.view_number()
1401                            );
1402                            return None;
1403                        }
1404                        let vid_share_proposal = Proposal {
1405                            data,
1406                            signature: proposal.signature,
1407                            _pd: proposal._pd,
1408                        };
1409                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411                                vid_share_proposal,
1412                            )),
1413                        )))
1414                    },
1415                };
1416                Some((sender, message, TransmitType::Direct(to)))
1417            },
1418            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419                sender,
1420                MessageKind::Consensus(SequencingMessage::General(
1421                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422                )),
1423                TransmitType::Direct(leader),
1424            )),
1425            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426                let message = if self
1427                    .upgrade_lock
1428                    .proposal2_version(epoch_root_qc.view_number())
1429                {
1430                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1431                        GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1432                    ))
1433                } else {
1434                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1435                        GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1436                    ))
1437                };
1438
1439                Some((sender, message, TransmitType::Direct(leader)))
1440            },
1441            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1442                sender,
1443                MessageKind::Consensus(SequencingMessage::General(
1444                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1445                )),
1446                TransmitType::Broadcast,
1447            )),
1448            _ => None,
1449        }
1450    }
1451
1452    /// Creates a network message and spawns a task that transmits it on the wire.
1453    async fn spawn_transmit_task(
1454        &mut self,
1455        message_kind: MessageKind<TYPES>,
1456        maybe_action: Option<HotShotAction>,
1457        transmit: TransmitType<TYPES>,
1458        sender: TYPES::SignatureKey,
1459    ) {
1460        let broadcast_delay = match &message_kind {
1461            MessageKind::Consensus(
1462                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1463                | SequencingMessage::Da(_),
1464            ) => BroadcastDelay::View(*message_kind.view_number()),
1465            _ => BroadcastDelay::None,
1466        };
1467        let message = Message {
1468            sender,
1469            kind: message_kind,
1470        };
1471        let view_number = message.kind.view_number();
1472        let epoch = message.kind.epoch();
1473        let committee_topic = Topic::Global;
1474        let Ok(mem) = self
1475            .membership_coordinator
1476            .stake_table_for_epoch(self.epoch)
1477        else {
1478            return;
1479        };
1480        let da_committee: BTreeSet<_> = mem.da_committee_members(view_number).cloned().collect();
1481        let network = Arc::clone(&self.network);
1482        let storage = self.storage.clone();
1483        let storage_metrics = Arc::clone(&self.storage_metrics);
1484        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1485        let upgrade_lock = self.upgrade_lock.clone();
1486        let handle = spawn(async move {
1487            if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
1488                maybe_action,
1489                storage.clone(),
1490                consensus,
1491                view_number,
1492                epoch,
1493            )
1494            .await
1495            .is_err()
1496            {
1497                return;
1498            }
1499            if let MessageKind::Consensus(SequencingMessage::General(
1500                GeneralConsensusMessage::Proposal(prop),
1501            )) = &message.kind
1502            {
1503                let now = Instant::now();
1504                if storage
1505                    .append_proposal2(&convert_proposal(prop.clone()))
1506                    .await
1507                    .is_err()
1508                {
1509                    return;
1510                }
1511                storage_metrics
1512                    .append_quorum_duration
1513                    .add_point(now.elapsed().as_secs_f64());
1514            }
1515
1516            let serialized_message = match upgrade_lock.serialize(&message) {
1517                Ok(serialized) => serialized,
1518                Err(e) => {
1519                    tracing::error!("Failed to serialize message: {e}");
1520                    return;
1521                },
1522            };
1523
1524            let transmit_result = match transmit {
1525                TransmitType::Direct(recipient) => {
1526                    network
1527                        .direct_message(view_number.u64().into(), serialized_message, recipient)
1528                        .await
1529                },
1530                TransmitType::Broadcast => {
1531                    network
1532                        .broadcast_message(
1533                            view_number.u64().into(),
1534                            serialized_message,
1535                            committee_topic,
1536                            broadcast_delay,
1537                        )
1538                        .await
1539                },
1540                TransmitType::DaCommitteeBroadcast => {
1541                    network
1542                        .da_broadcast_message(
1543                            view_number.u64().into(),
1544                            serialized_message,
1545                            da_committee.iter().cloned().collect(),
1546                            broadcast_delay,
1547                        )
1548                        .await
1549                },
1550            };
1551
1552            match transmit_result {
1553                Ok(()) => {},
1554                Err(e) => {
1555                    LogEvent::NetworkSendFailure.record();
1556                    tracing::debug!("Failed to send message task: {e:?}");
1557                },
1558            }
1559        });
1560        self.transmit_tasks
1561            .entry(view_number)
1562            .or_default()
1563            .push(handle);
1564    }
1565}
1566
1567/// A module with test helpers
1568pub mod test {
1569    use std::ops::{Deref, DerefMut};
1570
1571    use async_trait::async_trait;
1572
1573    use super::{
1574        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1575        Receiver, Result, Sender, Storage, TaskState, TransmitType,
1576    };
1577
1578    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1579    /// and changes it before transmitting on the network.
1580    pub type ModifierClosure<TYPES> = dyn Fn(
1581            &mut <TYPES as NodeType>::SignatureKey,
1582            &mut MessageKind<TYPES>,
1583            &mut TransmitType<TYPES>,
1584            &<TYPES as NodeType>::Membership,
1585        ) + Send
1586        + Sync;
1587
1588    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1589    pub struct NetworkEventTaskStateModifier<
1590        TYPES: NodeType,
1591        NET: ConnectedNetwork<TYPES::SignatureKey>,
1592        S: Storage<TYPES>,
1593    > {
1594        /// The real `NetworkEventTaskState`
1595        pub network_event_task_state: NetworkEventTaskState<TYPES, NET, S>,
1596        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1597        /// changes it before transmitting on the network.
1598        pub modifier: Arc<ModifierClosure<TYPES>>,
1599    }
1600
1601    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1602        NetworkEventTaskStateModifier<TYPES, NET, S>
1603    {
1604        /// Handles the received event modifying it before sending on the network.
1605        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1606            let mut maybe_action = None;
1607            if let Some((mut sender, mut message_kind, mut transmit)) =
1608                self.parse_event(event, &mut maybe_action).await
1609            {
1610                // Modify the values acquired by parsing the event.
1611                (self.modifier)(
1612                    &mut sender,
1613                    &mut message_kind,
1614                    &mut transmit,
1615                    self.membership_coordinator.membership(),
1616                );
1617
1618                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1619                    .await;
1620            }
1621        }
1622    }
1623
1624    #[async_trait]
1625    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1626        TaskState for NetworkEventTaskStateModifier<TYPES, NET, S>
1627    {
1628        type Event = HotShotEvent<TYPES>;
1629
1630        async fn handle_event(
1631            &mut self,
1632            event: Arc<Self::Event>,
1633            _sender: &Sender<Arc<Self::Event>>,
1634            _receiver: &Receiver<Arc<Self::Event>>,
1635        ) -> Result<()> {
1636            self.handle(event).await;
1637
1638            Ok(())
1639        }
1640
1641        fn cancel_subtasks(&mut self) {}
1642    }
1643
1644    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> Deref
1645        for NetworkEventTaskStateModifier<TYPES, NET, S>
1646    {
1647        type Target = NetworkEventTaskState<TYPES, NET, S>;
1648
1649        fn deref(&self) -> &Self::Target {
1650            &self.network_event_task_state
1651        }
1652    }
1653
1654    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> DerefMut
1655        for NetworkEventTaskStateModifier<TYPES, NET, S>
1656    {
1657        fn deref_mut(&mut self) -> &mut Self::Target {
1658            &mut self.network_event_task_state
1659        }
1660    }
1661}