1use std::{
13 fmt::{self, Debug},
14 marker::PhantomData,
15 sync::Arc,
16};
17
18use committable::Committable;
19use hotshot_utils::anytrace::*;
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use vbs::version::Version;
23use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION, Upgrade, VID2_UPGRADE_VERSION};
24
25pub const EXTERNAL_MESSAGE_VERSION: Version = Version { major: 0, minor: 0 };
27
28use crate::{
29 data::{
30 DaProposal, DaProposal2, EpochNumber, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
31 QuorumProposal2Legacy, QuorumProposalWrapper, UpgradeProposal, VidDisperseShare0,
32 VidDisperseShare1, VidDisperseShare2, ViewNumber,
33 },
34 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
35 request_response::ProposalRequestPayload,
36 simple_certificate::{
37 DaCertificate, DaCertificate2, EpochRootQuorumCertificateV1, EpochRootQuorumCertificateV2,
38 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
39 ViewSyncCommitCertificate, ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate,
40 ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate, ViewSyncPreCommitCertificate2,
41 },
42 simple_vote::{
43 DaVote, DaVote2, EpochRootQuorumVote, EpochRootQuorumVote2, HasEpoch, QuorumVote,
44 QuorumVote2, TimeoutVote, TimeoutVote2, UpgradeVote, ViewSyncCommitVote,
45 ViewSyncCommitVote2, ViewSyncFinalizeVote, ViewSyncFinalizeVote2, ViewSyncPreCommitVote,
46 ViewSyncPreCommitVote2,
47 },
48 traits::{
49 election::Membership,
50 network::{DataRequest, ResponseMessage, ViewMessage},
51 node_implementation::NodeType,
52 signature_key::SignatureKey,
53 },
54 utils::mnemonic,
55 vote::HasViewNumber,
56};
57
58#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
60#[serde(bound(deserialize = "", serialize = ""))]
61pub struct Message<TYPES: NodeType> {
62 pub sender: TYPES::SignatureKey,
64
65 pub kind: MessageKind<TYPES>,
67}
68
69impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
70 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
71 fmt.debug_struct("Message")
72 .field("sender", &mnemonic(&self.sender))
73 .field("kind", &self.kind)
74 .finish()
75 }
76}
77
78impl<TYPES: NodeType> HasViewNumber for Message<TYPES> {
79 fn view_number(&self) -> ViewNumber {
81 self.kind.view_number()
82 }
83}
84
85#[derive(Clone, Debug)]
87pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
88
89#[derive(PartialEq, Copy, Clone)]
91pub enum MessagePurpose {
92 Proposal,
94 LatestProposal,
96 LatestViewSyncCertificate,
98 Vote,
100 ViewSyncVote,
102 ViewSyncCertificate,
104 DaCertificate,
106 Internal,
108 Data,
110 VidDisperse,
112 UpgradeProposal,
114 UpgradeVote,
116 External,
118}
119
120#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
124#[serde(bound(deserialize = "", serialize = ""))]
125pub enum MessageKind<TYPES: NodeType> {
126 Consensus(SequencingMessage<TYPES>),
128 Data(DataMessage<TYPES>),
130 External(Vec<u8>),
132}
133
134pub enum RecipientList<K: SignatureKey> {
136 Broadcast,
138 Direct(K),
140 Many(Vec<K>),
142}
143
144impl<TYPES: NodeType> MessageKind<TYPES> {
145 pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
149 Self::Consensus(m)
150 }
151}
152
153impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
154 fn from(m: DataMessage<TYPES>) -> Self {
155 Self::Data(m)
156 }
157}
158
159impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
160 fn view_number(&self) -> ViewNumber {
161 match &self {
162 MessageKind::Consensus(message) => message.view_number(),
163 MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
164 MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
165 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
166 ResponseMessage::Found(m) => m.view_number(),
167 ResponseMessage::NotFound | ResponseMessage::Denied => ViewNumber::new(1),
168 },
169 MessageKind::External(_) => ViewNumber::new(1),
170 }
171 }
172}
173
174impl<TYPES: NodeType> HasEpoch for MessageKind<TYPES> {
175 fn epoch(&self) -> Option<EpochNumber> {
176 match &self {
177 MessageKind::Consensus(message) => message.epoch_number(),
178 MessageKind::Data(DataMessage::SubmitTransaction(..) | DataMessage::RequestData(_))
179 | MessageKind::External(_) => None,
180 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
181 ResponseMessage::Found(m) => m.epoch_number(),
182 ResponseMessage::NotFound | ResponseMessage::Denied => None,
183 },
184 }
185 }
186}
187
188#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
189#[serde(bound(deserialize = "", serialize = ""))]
190pub enum GeneralConsensusMessage<TYPES: NodeType> {
192 Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
194
195 Vote(QuorumVote<TYPES>),
197
198 ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
200
201 ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
203
204 ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
206
207 ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate<TYPES>),
209
210 ViewSyncCommitCertificate(ViewSyncCommitCertificate<TYPES>),
212
213 ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate<TYPES>),
215
216 TimeoutVote(TimeoutVote<TYPES>),
218
219 UpgradeProposal(Proposal<TYPES, UpgradeProposal>),
221
222 UpgradeVote(UpgradeVote<TYPES>),
224
225 ProposalRequested(
227 ProposalRequestPayload<TYPES>,
228 <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
229 ),
230
231 ProposalResponse(Proposal<TYPES, QuorumProposal<TYPES>>),
233
234 Proposal2Legacy(Proposal<TYPES, QuorumProposal2Legacy<TYPES>>),
236
237 Vote2(QuorumVote2<TYPES>),
239
240 EpochRootQuorumVote(EpochRootQuorumVote<TYPES>),
242
243 ProposalResponse2Legacy(Proposal<TYPES, QuorumProposal2Legacy<TYPES>>),
245
246 HighQc(
248 QuorumCertificate2<TYPES>,
249 Option<NextEpochQuorumCertificate2<TYPES>>,
250 ),
251
252 ExtendedQc(
254 QuorumCertificate2<TYPES>,
255 NextEpochQuorumCertificate2<TYPES>,
256 ),
257
258 EpochRootQcV1(EpochRootQuorumCertificateV1<TYPES>),
260
261 ViewSyncPreCommitVote2(ViewSyncPreCommitVote2<TYPES>),
263
264 ViewSyncCommitVote2(ViewSyncCommitVote2<TYPES>),
266
267 ViewSyncFinalizeVote2(ViewSyncFinalizeVote2<TYPES>),
269
270 ViewSyncPreCommitCertificate2(ViewSyncPreCommitCertificate2<TYPES>),
272
273 ViewSyncCommitCertificate2(ViewSyncCommitCertificate2<TYPES>),
275
276 ViewSyncFinalizeCertificate2(ViewSyncFinalizeCertificate2<TYPES>),
278
279 TimeoutVote2(TimeoutVote2<TYPES>),
281
282 EpochRootQc(EpochRootQuorumCertificateV2<TYPES>),
284
285 Proposal2(Proposal<TYPES, QuorumProposal2<TYPES>>),
287
288 ProposalResponse2(Proposal<TYPES, QuorumProposal2<TYPES>>),
290
291 EpochRootQuorumVote2(EpochRootQuorumVote2<TYPES>),
293}
294
295#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
296#[serde(bound(deserialize = "", serialize = ""))]
297pub enum DaConsensusMessage<TYPES: NodeType> {
299 DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
301
302 DaVote(DaVote<TYPES>),
304
305 DaCertificate(DaCertificate<TYPES>),
307
308 VidDisperseMsg(Proposal<TYPES, VidDisperseShare0<TYPES>>),
312
313 DaProposal2(Proposal<TYPES, DaProposal2<TYPES>>),
315
316 DaVote2(DaVote2<TYPES>),
318
319 DaCertificate2(DaCertificate2<TYPES>),
321
322 VidDisperseMsg1(Proposal<TYPES, VidDisperseShare1<TYPES>>),
324
325 VidDisperseMsg2(Proposal<TYPES, VidDisperseShare2<TYPES>>),
327}
328
329#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
331#[serde(bound(deserialize = "", serialize = ""))]
332#[allow(clippy::large_enum_variant)]
333pub enum SequencingMessage<TYPES: NodeType> {
334 General(GeneralConsensusMessage<TYPES>),
336
337 Da(DaConsensusMessage<TYPES>),
339}
340
341impl<TYPES: NodeType> SequencingMessage<TYPES> {
342 fn view_number(&self) -> ViewNumber {
344 match &self {
345 SequencingMessage::General(general_message) => {
346 match general_message {
347 GeneralConsensusMessage::Proposal(p) => {
348 p.data.view_number()
351 },
352 GeneralConsensusMessage::Proposal2Legacy(p) => {
353 p.data.view_number()
356 },
357 GeneralConsensusMessage::Proposal2(p) => {
358 p.data.view_number()
361 },
362 GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
363 GeneralConsensusMessage::ProposalResponse(proposal) => {
364 proposal.data.view_number()
365 },
366 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
367 proposal.data.view_number()
368 },
369 GeneralConsensusMessage::ProposalResponse2(proposal) => {
370 proposal.data.view_number()
371 },
372 GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
373 GeneralConsensusMessage::Vote2(vote_message) => vote_message.view_number(),
374 GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
375 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
376 message.view_number()
377 },
378 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
379 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
380 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
381 message.view_number()
382 },
383 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
384 message.view_number()
385 },
386 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
387 message.view_number()
388 },
389 GeneralConsensusMessage::TimeoutVote2(message) => message.view_number(),
390 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => {
391 message.view_number()
392 },
393 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.view_number(),
394 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => {
395 message.view_number()
396 },
397 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
398 message.view_number()
399 },
400 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => {
401 message.view_number()
402 },
403 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
404 message.view_number()
405 },
406 GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
407 GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
408 GeneralConsensusMessage::HighQc(qc, _)
409 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.view_number(),
410 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.view_number(),
411 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => vote.view_number(),
412 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.view_number(),
413 GeneralConsensusMessage::EpochRootQcV1(root_qc) => root_qc.view_number(),
414 }
415 },
416 SequencingMessage::Da(da_message) => {
417 match da_message {
418 DaConsensusMessage::DaProposal(p) => {
419 p.data.view_number()
422 },
423 DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
424 DaConsensusMessage::DaCertificate(cert) => cert.view_number,
425 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
426 DaConsensusMessage::DaProposal2(p) => {
427 p.data.view_number()
430 },
431 DaConsensusMessage::DaVote2(vote_message) => vote_message.view_number(),
432 DaConsensusMessage::DaCertificate2(cert) => cert.view_number,
433 DaConsensusMessage::VidDisperseMsg1(disperse) => disperse.data.view_number(),
434 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.view_number(),
435 }
436 },
437 }
438 }
439
440 fn epoch_number(&self) -> Option<EpochNumber> {
442 match &self {
443 SequencingMessage::General(general_message) => {
444 match general_message {
445 GeneralConsensusMessage::Proposal(p) => {
446 p.data.epoch()
449 },
450 GeneralConsensusMessage::Proposal2Legacy(p) => {
451 p.data.epoch()
454 },
455 GeneralConsensusMessage::Proposal2(p) => {
456 p.data.epoch()
459 },
460 GeneralConsensusMessage::ProposalRequested(..) => None,
461 GeneralConsensusMessage::ProposalResponse(proposal) => proposal.data.epoch(),
462 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
463 proposal.data.epoch()
464 },
465 GeneralConsensusMessage::ProposalResponse2(proposal) => proposal.data.epoch(),
466 GeneralConsensusMessage::Vote(vote_message) => vote_message.epoch(),
467 GeneralConsensusMessage::Vote2(vote_message) => vote_message.epoch(),
468 GeneralConsensusMessage::TimeoutVote(message) => message.epoch(),
469 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => message.epoch(),
470 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.epoch(),
471 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.epoch(),
472 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
473 message.epoch()
474 },
475 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => message.epoch(),
476 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
477 message.epoch()
478 },
479 GeneralConsensusMessage::TimeoutVote2(message) => message.epoch(),
480 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => message.epoch(),
481 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.epoch(),
482 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => message.epoch(),
483 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
484 message.epoch()
485 },
486 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => message.epoch(),
487 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
488 message.epoch()
489 },
490 GeneralConsensusMessage::UpgradeProposal(message) => message.data.epoch(),
491 GeneralConsensusMessage::UpgradeVote(message) => message.epoch(),
492 GeneralConsensusMessage::HighQc(qc, _)
493 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.epoch(),
494 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.epoch(),
495 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => vote.epoch(),
496 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.epoch(),
497 GeneralConsensusMessage::EpochRootQcV1(root_qc) => root_qc.epoch(),
498 }
499 },
500 SequencingMessage::Da(da_message) => {
501 match da_message {
502 DaConsensusMessage::DaProposal(p) => {
503 p.data.epoch()
506 },
507 DaConsensusMessage::DaVote(vote_message) => vote_message.epoch(),
508 DaConsensusMessage::DaCertificate(cert) => cert.epoch(),
509 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.epoch(),
510 DaConsensusMessage::VidDisperseMsg1(disperse) => disperse.data.epoch(),
511 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.epoch(),
512 DaConsensusMessage::DaProposal2(p) => {
513 p.data.epoch()
516 },
517 DaConsensusMessage::DaVote2(vote_message) => vote_message.epoch(),
518 DaConsensusMessage::DaCertificate2(cert) => cert.epoch(),
519 }
520 },
521 }
522 }
523}
524
525#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
526#[serde(bound(deserialize = ""))]
527#[allow(clippy::large_enum_variant)]
528pub enum DataMessage<TYPES: NodeType> {
531 SubmitTransaction(TYPES::Transaction, ViewNumber),
535 RequestData(DataRequest<TYPES>),
537 DataResponse(ResponseMessage<TYPES>),
539}
540
541#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
542#[serde(bound(deserialize = ""))]
543pub struct Proposal<TYPES: NodeType, PROPOSAL: HasViewNumber + HasEpoch + DeserializeOwned> {
545 pub data: PROPOSAL,
549 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
551 pub _pd: PhantomData<TYPES>,
553}
554
555pub fn convert_proposal<TYPES, PROPOSAL, PROPOSAL2>(
557 proposal: Proposal<TYPES, PROPOSAL>,
558) -> Proposal<TYPES, PROPOSAL2>
559where
560 TYPES: NodeType,
561 PROPOSAL: HasViewNumber + HasEpoch + DeserializeOwned,
562 PROPOSAL2: HasViewNumber + HasEpoch + DeserializeOwned + From<PROPOSAL>,
563{
564 Proposal {
565 data: proposal.data.into(),
566 signature: proposal.signature,
567 _pd: proposal._pd,
568 }
569}
570
571impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
572where
573 TYPES: NodeType,
574{
575 pub async fn validate_signature(
579 &self,
580 membership: &TYPES::Membership,
581 _epoch_height: u64,
582 upgrade_lock: &UpgradeLock<TYPES>,
583 ) -> Result<()> {
584 let view_number = self.data.view_number();
585 let view_leader_key = membership.leader(view_number, None)?;
586 let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
587
588 ensure!(
589 view_leader_key.validate(
590 &self.signature,
591 proposed_leaf.commit(upgrade_lock).await.as_ref()
592 ),
593 "Proposal signature is invalid."
594 );
595
596 Ok(())
597 }
598}
599
600impl<TYPES> Proposal<TYPES, QuorumProposalWrapper<TYPES>>
601where
602 TYPES: NodeType,
603{
604 pub async fn validate_signature(&self, membership: &EpochMembership<TYPES>) -> Result<()> {
608 let view_number = self.data.proposal.view_number();
609 let view_leader_key = membership.leader(view_number).await?;
610 let proposed_leaf = Leaf2::from_quorum_proposal(&self.data);
611
612 ensure!(
613 view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
614 "Proposal signature is invalid."
615 );
616
617 Ok(())
618 }
619}
620
621impl<TYPES> Proposal<TYPES, QuorumProposal2<TYPES>>
622where
623 TYPES: NodeType,
624{
625 pub async fn validate_signature(
626 &self,
627 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
628 ) -> Result<()> {
629 let view_number = self.data.view_number();
630 let epoch = self.data.epoch().ok_or(error!("Epoch is not set"))?;
631 let membership = membership_coordinator
632 .membership_for_epoch(Some(epoch))
633 .await?;
634 let view_leader_key = membership.leader(view_number).await?;
635 let proposed_leaf =
636 Leaf2::from_quorum_proposal(&QuorumProposalWrapper::from(self.data.clone()));
637
638 ensure!(
639 view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
640 "Proposal signature is invalid."
641 );
642
643 Ok(())
644 }
645}
646
647#[derive(Clone, Debug)]
649pub struct UpgradeLock<TYPES: NodeType> {
650 decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
651 upgrade: Upgrade,
652}
653
654impl<TYPES: NodeType> UpgradeLock<TYPES> {
655 pub fn new(upgrade: Upgrade) -> Self {
657 Self {
658 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
659 upgrade,
660 }
661 }
662
663 pub fn from_certificate(
665 upgrade: Upgrade,
666 certificate: &Option<UpgradeCertificate<TYPES>>,
667 ) -> Self {
668 Self {
669 decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
670 upgrade,
671 }
672 }
673
674 pub fn upgrade(&self) -> Upgrade {
675 self.upgrade
676 }
677
678 pub fn upgrade_view(&self) -> Option<ViewNumber> {
679 self.decided_upgrade_certificate
680 .read()
681 .as_ref()
682 .map(|cert| cert.data.new_version_first_view)
683 }
684
685 pub fn decided_upgrade_cert(&self) -> Option<UpgradeCertificate<TYPES>> {
686 self.decided_upgrade_certificate.read().clone()
687 }
688
689 pub fn set_decided_upgrade_cert<C>(&self, cert: C)
690 where
691 C: Into<Option<UpgradeCertificate<TYPES>>>,
692 {
693 *self.decided_upgrade_certificate.write() = cert.into()
694 }
695
696 pub fn apply<F>(&self, f: F)
698 where
699 F: FnOnce(&mut Option<UpgradeCertificate<TYPES>>),
700 {
701 let mut guard = self.decided_upgrade_certificate.write();
702 f(&mut *guard)
703 }
704
705 pub fn version(&self, view: ViewNumber) -> Result<Version> {
710 if let Some(cert) = &*self.decided_upgrade_certificate.read() {
711 if view >= cert.data.new_version_first_view {
712 if cert.data.new_version == self.upgrade.target {
713 Ok(self.upgrade.target)
714 } else {
715 bail!("The network has upgraded to a new version that we do not support!");
716 }
717 } else {
718 Ok(self.upgrade.base)
719 }
720 } else {
721 Ok(self.upgrade.base)
722 }
723 }
724
725 pub fn version_infallible(&self, view: ViewNumber) -> Version {
729 if let Some(cert) = &*self.decided_upgrade_certificate.read() {
730 if view >= cert.data.new_version_first_view {
731 cert.data.new_version
732 } else {
733 cert.data.old_version
734 }
735 } else {
736 self.upgrade.base
737 }
738 }
739
740 pub fn epochs_enabled(&self, view: ViewNumber) -> bool {
742 self.version_infallible(view) >= EPOCH_VERSION
743 }
744
745 pub fn proposal2_legacy_version(&self, view: ViewNumber) -> bool {
747 let version = self.version_infallible(view);
748 version >= EPOCH_VERSION && version < DRB_AND_HEADER_UPGRADE_VERSION
749 }
750
751 pub fn proposal2_version(&self, view: ViewNumber) -> bool {
753 let version = self.version_infallible(view);
754 version >= EPOCH_VERSION && version >= DRB_AND_HEADER_UPGRADE_VERSION
755 }
756
757 pub fn upgraded_drb_and_header(&self, view: ViewNumber) -> bool {
759 self.version_infallible(view) >= DRB_AND_HEADER_UPGRADE_VERSION
760 }
761
762 pub fn upgraded_vid2(&self, view: ViewNumber) -> bool {
763 self.version_infallible(view) >= VID2_UPGRADE_VERSION
764 }
765
766 pub fn serialize<M: HasViewNumber + Serialize>(&self, message: &M) -> Result<Vec<u8>> {
774 let view = message.view_number();
775
776 let version = self.version(view)?;
777
778 let serialized_message = if version == self.upgrade.base || version == self.upgrade.target {
779 versions::encode(version, message)
780 } else {
781 bail!(
782 "Attempted to serialize with version {version}, which is incompatible. This \
783 should be impossible."
784 );
785 };
786
787 serialized_message
788 .wrap()
789 .context(info!("Failed to serialize message!"))
790 }
791
792 pub fn deserialize<M: Debug + HasViewNumber + DeserializeOwned>(
802 &self,
803 message: &[u8],
804 ) -> Result<(M, Version)> {
805 let (version, deserialized_message) = versions::decode::<M>(message)
806 .wrap()
807 .context(info!("Failed to read version and message!"))?;
808
809 if EXTERNAL_MESSAGE_VERSION != version
810 && self.upgrade.base != version
811 && self.upgrade.target != version
812 {
813 bail!(warn!(
814 "Received a message with state version {version} which is invalid for its view: \
815 {:?}",
816 deserialized_message
817 ));
818 }
819
820 if version == EXTERNAL_MESSAGE_VERSION {
823 return Ok((deserialized_message, version));
824 }
825
826 let view = deserialized_message.view_number();
828
829 let expected_version = self.version(view)?;
831
832 if version != expected_version {
834 return Err(error!(format!(
835 "Message has invalid version number for its view. Expected: {expected_version}, \
836 Actual: {version}, View: {view:?}\n\n{deserialized_message:?}"
837 )));
838 };
839
840 Ok((deserialized_message, version))
841 }
842}