1#[cfg(feature = "docs")]
12pub mod documentation;
13
14use committable::Committable;
15use futures::future::{Either, select};
16use hotshot_types::{
17 drb::{DrbResult, INITIAL_DRB_RESULT, drb_difficulty_selector},
18 epoch_membership::EpochMembershipCoordinator,
19 message::UpgradeLock,
20 simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
21 traits::{
22 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
23 signature_key::StateSignatureKey, storage::Storage,
24 },
25 utils::{epoch_from_block_number, is_ge_epoch_root},
26};
27use rand::Rng;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36use versions::{EPOCH_VERSION, Upgrade};
37
38pub mod helpers;
40
41use std::{
42 collections::{BTreeMap, HashMap},
43 num::NonZeroUsize,
44 sync::Arc,
45 time::Duration,
46};
47
48use alloy::primitives::U256;
49use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
50use async_lock::RwLock;
51use async_trait::async_trait;
52use futures::join;
53use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
54use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
55pub use hotshot_types::error::HotShotError;
58use hotshot_types::{
59 HotShotConfig,
60 consensus::{
61 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
62 ViewInner,
63 },
64 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
65 data::{EpochNumber, Leaf2, ViewNumber},
66 event::{EventType, LeafInfo},
67 message::{DataMessage, Message, MessageKind, Proposal},
68 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
69 stake_table::HSStakeTable,
70 storage_metrics::StorageMetricsValue,
71 traits::{
72 consensus_api::ConsensusApi, network::ConnectedNetwork, node_implementation::NodeType,
73 signature_key::SignatureKey, states::ValidatedState,
74 },
75 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
76};
77use hotshot_utils::warn;
78pub use rand;
80use tokio::{spawn, time::sleep};
81use tracing::{debug, instrument, trace};
82
83use crate::{
86 tasks::{add_consensus_tasks, add_network_tasks},
87 traits::NodeImplementation,
88 types::{Event, SystemContextHandle},
89};
90
91pub const H_512: usize = 64;
93pub const H_256: usize = 32;
95
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>> {
98 public_key: TYPES::SignatureKey,
100
101 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
106
107 pub config: HotShotConfig<TYPES>,
109
110 pub network: Arc<I::Network>,
112
113 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
115
116 metrics: Arc<ConsensusMetricsValue>,
118
119 consensus: OuterConsensus<TYPES>,
121
122 instance_state: Arc<TYPES::InstanceState>,
124
125 start_view: ViewNumber,
127
128 start_epoch: Option<EpochNumber>,
130
131 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
133
134 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
136
137 anchored_leaf: Leaf2<TYPES>,
139
140 #[allow(clippy::type_complexity)]
142 internal_event_stream: (
143 Sender<Arc<HotShotEvent<TYPES>>>,
144 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
145 ),
146
147 pub id: u64,
149
150 pub storage: I::Storage,
152
153 pub storage_metrics: Arc<StorageMetricsValue>,
155
156 pub upgrade_lock: UpgradeLock<TYPES>,
158}
159impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Clone for SystemContext<TYPES, I> {
160 #![allow(deprecated)]
161 fn clone(&self) -> Self {
162 Self {
163 public_key: self.public_key.clone(),
164 private_key: self.private_key.clone(),
165 state_private_key: self.state_private_key.clone(),
166 config: self.config.clone(),
167 network: Arc::clone(&self.network),
168 membership_coordinator: self.membership_coordinator.clone(),
169 metrics: Arc::clone(&self.metrics),
170 consensus: self.consensus.clone(),
171 instance_state: Arc::clone(&self.instance_state),
172 start_view: self.start_view,
173 start_epoch: self.start_epoch,
174 output_event_stream: self.output_event_stream.clone(),
175 external_event_stream: self.external_event_stream.clone(),
176 anchored_leaf: self.anchored_leaf.clone(),
177 internal_event_stream: self.internal_event_stream.clone(),
178 id: self.id,
179 storage: self.storage.clone(),
180 storage_metrics: Arc::clone(&self.storage_metrics),
181 upgrade_lock: self.upgrade_lock.clone(),
182 }
183 }
184}
185
186impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
187 #![allow(deprecated)]
188 #[allow(clippy::too_many_arguments)]
199 pub async fn new(
200 public_key: TYPES::SignatureKey,
201 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
202 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
203 nonce: u64,
204 config: HotShotConfig<TYPES>,
205 upgrade: versions::Upgrade,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 upgrade,
223 memberships,
224 network,
225 initializer,
226 consensus_metrics,
227 storage,
228 storage_metrics,
229 internal_chan,
230 external_chan,
231 )
232 .await
233 }
234
235 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
243 pub async fn new_from_channels(
244 public_key: TYPES::SignatureKey,
245 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
246 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
247 nonce: u64,
248 config: HotShotConfig<TYPES>,
249 upgrade: versions::Upgrade,
250 membership_coordinator: EpochMembershipCoordinator<TYPES>,
251 network: Arc<I::Network>,
252 initializer: HotShotInitializer<TYPES>,
253 consensus_metrics: ConsensusMetricsValue,
254 storage: I::Storage,
255 storage_metrics: StorageMetricsValue,
256 internal_channel: (
257 Sender<Arc<HotShotEvent<TYPES>>>,
258 Receiver<Arc<HotShotEvent<TYPES>>>,
259 ),
260 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
261 ) -> Arc<Self> {
262 debug!("Creating a new hotshot");
263
264 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
265
266 let consensus_metrics = Arc::new(consensus_metrics);
267 let storage_metrics = Arc::new(storage_metrics);
268 let anchored_leaf = initializer.anchor_leaf;
269 let instance_state = initializer.instance_state;
270
271 let (internal_tx, internal_rx) = internal_channel;
272 let (mut external_tx, external_rx) = external_channel;
273
274 let mut internal_rx = internal_rx.new_receiver();
275
276 let mut external_rx = external_rx.new_receiver();
277
278 internal_rx.set_overflow(true);
281 external_rx.set_overflow(true);
283
284 tracing::warn!(
285 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
286 upgrade.base,
287 upgrade.target
288 );
289 tracing::warn!(
290 "Loading previously decided upgrade certificate from storage: {:?}",
291 initializer.decided_upgrade_certificate
292 );
293
294 let upgrade_lock = UpgradeLock::<TYPES>::from_certificate(
295 upgrade,
296 &initializer.decided_upgrade_certificate,
297 );
298
299 let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
300 cert.data.new_version
301 } else {
302 upgrade.base
303 };
304
305 debug!("Setting DRB difficulty selector in membership");
306 let drb_difficulty_selector = drb_difficulty_selector(&config);
307
308 membership_coordinator.set_drb_difficulty_selector(drb_difficulty_selector);
309
310 for da_committee in &config.da_committees {
311 if current_version >= da_committee.start_version {
312 membership_coordinator.membership().add_da_committee(
313 da_committee.start_epoch.into(),
314 da_committee.committee.clone(),
315 );
316 }
317 }
318
319 let validated_state = initializer.anchor_state;
322
323 load_start_epoch_info(
324 membership_coordinator.membership(),
325 &initializer.start_epoch_info,
326 config.epoch_height,
327 config.epoch_start_block,
328 )
329 .await;
330
331 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
333 EpochNumber::new(epoch_from_block_number(
334 block_number + 1,
335 config.epoch_height,
336 ))
337 });
338
339 let mut validated_state_map = BTreeMap::default();
341 validated_state_map.insert(
342 anchored_leaf.view_number(),
343 View {
344 view_inner: ViewInner::Leaf {
345 leaf: anchored_leaf.commit(),
346 state: Arc::clone(&validated_state),
347 delta: initializer.anchor_state_delta,
348 epoch,
349 },
350 },
351 );
352 for (view_num, inner) in initializer.undecided_state {
353 validated_state_map.insert(view_num, inner);
354 }
355
356 let mut saved_leaves = HashMap::new();
357 let mut saved_payloads = BTreeMap::new();
358 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
359
360 for (_, leaf) in initializer.undecided_leaves {
361 saved_leaves.insert(leaf.commit(), leaf.clone());
362 }
363 if let Some(payload) = anchored_leaf.block_payload() {
364 let metadata = anchored_leaf.block_header().metadata().clone();
365 saved_payloads.insert(
366 anchored_leaf.view_number(),
367 Arc::new(PayloadWithMetadata { payload, metadata }),
368 );
369 }
370 let high_qc_block_number = initializer.high_qc.data.block_number;
371 let (stake_table, success_threshold) =
372 if let Ok(epoch_membership) = membership_coordinator.stake_table_for_epoch(epoch) {
373 (
374 HSStakeTable::from_iter(epoch_membership.stake_table()),
375 epoch_membership.success_threshold(),
376 )
377 } else {
378 tracing::warn!(
379 "Failed to get stake table for epoch {:?} while creating vote participation",
380 epoch
381 );
382 (HSStakeTable::default(), U256::MAX)
383 };
384
385 let consensus = Consensus::new(
386 validated_state_map,
387 Some(initializer.saved_vid_shares),
388 anchored_leaf.view_number(),
389 epoch,
390 anchored_leaf.view_number(),
391 anchored_leaf.view_number(),
392 initializer.last_actioned_view,
393 initializer.saved_proposals,
394 saved_leaves,
395 saved_payloads,
396 initializer.high_qc,
397 initializer.next_epoch_high_qc,
398 Arc::clone(&consensus_metrics),
399 config.epoch_height,
400 initializer.state_cert,
401 config.drb_difficulty,
402 config.drb_upgrade_difficulty,
403 stake_table,
404 success_threshold,
405 );
406
407 let consensus = Arc::new(RwLock::new(consensus));
408
409 if let Some(epoch) = epoch {
410 tracing::info!(
411 "Triggering catchup for epoch {} and next epoch {}",
412 epoch,
413 epoch + 1
414 );
415 let _ = membership_coordinator.membership_for_epoch(Some(epoch));
417 let _ = membership_coordinator.membership_for_epoch(Some(epoch + 1));
418 if let Some(high_qc_block_number) = high_qc_block_number
421 && is_ge_epoch_root(high_qc_block_number, config.epoch_height)
422 {
423 let _ = membership_coordinator.stake_table_for_epoch(Some(epoch + 2));
424 }
425
426 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
427 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
428 if let Ok(mem) = membership_coordinator.stake_table_for_epoch(Some(epoch + 1)) {
429 mem.add_drb_result(drb_result);
430 }
431 }
432 }
433
434 external_tx.set_await_active(false);
437
438 let inner: Arc<SystemContext<TYPES, I>> = Arc::new(SystemContext {
439 id: nonce,
440 consensus: OuterConsensus::new(consensus),
441 instance_state: Arc::new(instance_state),
442 public_key,
443 private_key,
444 state_private_key,
445 config,
446 start_view: initializer.start_view,
447 start_epoch: initializer.start_epoch,
448 network,
449 membership_coordinator,
450 metrics: Arc::clone(&consensus_metrics),
451 internal_event_stream: (internal_tx, internal_rx.deactivate()),
452 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
453 external_event_stream: (external_tx, external_rx.deactivate()),
454 anchored_leaf: anchored_leaf.clone(),
455 storage,
456 storage_metrics,
457 upgrade_lock,
458 });
459
460 inner
461 }
462
463 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
468 pub async fn start_consensus(&self) {
469 #[cfg(all(feature = "rewind", not(debug_assertions)))]
470 compile_error!("Cannot run rewind in production builds!");
471
472 debug!("Starting Consensus");
473 let consensus = self.consensus.read().await;
474
475 let first_epoch = option_epoch_from_block_number(
476 self.upgrade_lock.upgrade().base >= EPOCH_VERSION,
477 self.config.epoch_start_block,
478 self.config.epoch_height,
479 );
480 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
484 #[allow(clippy::panic)]
485 self.internal_event_stream
486 .0
487 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
488 self.start_view,
489 initial_view_change_epoch,
490 )))
491 .await
492 .unwrap_or_else(|_| {
493 panic!(
494 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
495 self.start_view, initial_view_change_epoch,
496 )
497 });
498
499 let event_stream = self.internal_event_stream.0.clone();
501 let next_view_timeout = self.config.next_view_timeout;
502 let start_view = self.start_view;
503 let start_epoch = self.start_epoch;
504
505 spawn({
508 async move {
509 sleep(Duration::from_millis(next_view_timeout)).await;
510 broadcast_event(
511 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
512 &event_stream,
513 )
514 .await;
515 }
516 });
517 #[allow(clippy::panic)]
518 self.internal_event_stream
519 .0
520 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
521 consensus.high_qc().clone(),
522 ))))
523 .await
524 .unwrap_or_else(|_| {
525 panic!(
526 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
527 consensus.high_qc()
528 )
529 });
530
531 {
532 if self.anchored_leaf.view_number() == ViewNumber::genesis() {
535 let (validated_state, state_delta) =
536 TYPES::ValidatedState::genesis(&self.instance_state);
537
538 let qc = QuorumCertificate2::genesis(
539 &validated_state,
540 self.instance_state.as_ref(),
541 self.upgrade_lock.upgrade(),
542 )
543 .await;
544
545 broadcast_event(
546 Event {
547 view_number: self.anchored_leaf.view_number(),
548 event: EventType::Decide {
549 leaf_chain: Arc::new(vec![LeafInfo::new(
550 self.anchored_leaf.clone(),
551 Arc::new(validated_state),
552 Some(Arc::new(state_delta)),
553 None,
554 None,
555 )]),
556 committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
557 deciding_qc: None,
558 block_size: None,
559 },
560 },
561 &self.external_event_stream.0,
562 )
563 .await;
564 }
565 }
566 }
567
568 async fn send_external_event(&self, event: Event<TYPES>) {
570 debug!(?event, "send_external_event");
571 broadcast_event(event, &self.external_event_stream.0).await;
572 }
573
574 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
580 pub async fn publish_transaction_async(
581 &self,
582 transaction: TYPES::Transaction,
583 ) -> Result<(), HotShotError<TYPES>> {
584 trace!("Adding transaction to our own queue");
585
586 let api = self.clone();
587
588 let consensus_reader = api.consensus.read().await;
589 let view_number = consensus_reader.cur_view();
590 let epoch = consensus_reader.cur_epoch();
591 drop(consensus_reader);
592
593 let message_kind: DataMessage<TYPES> =
595 DataMessage::SubmitTransaction(transaction.clone(), view_number);
596 let message = Message {
597 sender: api.public_key.clone(),
598 kind: MessageKind::from(message_kind),
599 };
600
601 let serialized_message = self.upgrade_lock.serialize(&message).map_err(|err| {
602 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
603 })?;
604
605 let membership = match api.membership_coordinator.membership_for_epoch(epoch) {
606 Ok(m) => m,
607 Err(e) => return Err(HotShotError::InvalidState(e.message)),
608 };
609
610 spawn(async move {
611 let memberships_da_committee_members = membership
612 .da_committee_members(view_number)
613 .cloned()
614 .collect();
615
616 join! {
617 api
625 .network.da_broadcast_message(
626 view_number.u64().into(),
627 serialized_message,
628 memberships_da_committee_members,
629 BroadcastDelay::None,
630 ),
631 api
632 .send_external_event(Event {
633 view_number,
634 event: EventType::Transactions {
635 transactions: vec![transaction],
636 },
637 }),
638 }
639 });
640 Ok(())
641 }
642
643 #[must_use]
645 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
646 Arc::clone(&self.consensus.inner_consensus)
647 }
648
649 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
651 Arc::clone(&self.instance_state)
652 }
653
654 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
658 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
659 self.consensus.read().await.decided_leaf()
660 }
661
662 #[must_use]
668 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
669 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
670 self.consensus.try_read().map(|guard| guard.decided_leaf())
671 }
672
673 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
678 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
679 Arc::clone(&self.consensus.read().await.decided_state())
680 }
681
682 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
690 pub async fn state(&self, view: ViewNumber) -> Option<Arc<TYPES::ValidatedState>> {
691 self.consensus.read().await.state(view).cloned()
692 }
693
694 #[allow(clippy::too_many_arguments)]
708 pub async fn init(
709 public_key: TYPES::SignatureKey,
710 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
711 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
712 node_id: u64,
713 config: HotShotConfig<TYPES>,
714 upgrade: versions::Upgrade,
715 memberships: EpochMembershipCoordinator<TYPES>,
716 network: Arc<I::Network>,
717 initializer: HotShotInitializer<TYPES>,
718 consensus_metrics: ConsensusMetricsValue,
719 storage: I::Storage,
720 storage_metrics: StorageMetricsValue,
721 ) -> Result<
722 (
723 SystemContextHandle<TYPES, I>,
724 Sender<Arc<HotShotEvent<TYPES>>>,
725 Receiver<Arc<HotShotEvent<TYPES>>>,
726 ),
727 HotShotError<TYPES>,
728 > {
729 let hotshot = Self::new(
730 public_key,
731 private_key,
732 state_private_key,
733 node_id,
734 config,
735 upgrade,
736 memberships,
737 network,
738 initializer,
739 consensus_metrics,
740 storage,
741 storage_metrics,
742 )
743 .await;
744 let handle = Arc::clone(&hotshot).run_tasks().await;
745 let (tx, rx) = hotshot.internal_event_stream.clone();
746
747 Ok((handle, tx, rx.activate()))
748 }
749 #[must_use]
751 pub fn next_view_timeout(&self) -> u64 {
752 self.config.next_view_timeout
753 }
754}
755
756impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
757 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I> {
761 let consensus_registry = ConsensusTaskRegistry::new();
762 let network_registry = NetworkTaskRegistry::new();
763
764 let output_event_stream = self.external_event_stream.clone();
765 let internal_event_stream = self.internal_event_stream.clone();
766
767 let mut handle = SystemContextHandle {
768 consensus_registry,
769 network_registry,
770 output_event_stream: output_event_stream.clone(),
771 internal_event_stream: internal_event_stream.clone(),
772 hotshot: self.clone().into(),
773 storage: self.storage.clone(),
774 network: Arc::clone(&self.network),
775 membership_coordinator: self.membership_coordinator.clone(),
776 epoch_height: self.config.epoch_height,
777 };
778
779 add_network_tasks::<TYPES, I>(&mut handle).await;
780 add_consensus_tasks::<TYPES, I>(&mut handle).await;
781
782 handle
783 }
784}
785
786type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
788
789#[async_trait]
791pub trait TwinsHandlerState<TYPES, I>
792where
793 Self: std::fmt::Debug + Send + Sync,
794 TYPES: NodeType,
795 I: NodeImplementation<TYPES>,
796{
797 async fn send_handler(
799 &mut self,
800 event: &HotShotEvent<TYPES>,
801 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
802
803 async fn recv_handler(
805 &mut self,
806 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
807 ) -> Vec<HotShotEvent<TYPES>>;
808
809 fn fuse_channels(
813 &'static mut self,
814 left: Channel<HotShotEvent<TYPES>>,
815 right: Channel<HotShotEvent<TYPES>>,
816 ) -> Channel<HotShotEvent<TYPES>> {
817 let send_state = Arc::new(RwLock::new(self));
818 let recv_state = Arc::clone(&send_state);
819
820 let (left_sender, mut left_receiver) = (left.0, left.1);
821 let (right_sender, mut right_receiver) = (right.0, right.1);
822
823 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
825 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
827 broadcast(EVENT_CHANNEL_SIZE);
828
829 let _recv_loop_handle = spawn(async move {
830 loop {
831 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
832 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
833 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
834 };
835
836 let mut state = recv_state.write().await;
837 let mut result = state.recv_handler(&msg).await;
838
839 while let Some(event) = result.pop() {
840 let _ = sender_to_network.broadcast(event.into()).await;
841 }
842 }
843 });
844
845 let _send_loop_handle = spawn(async move {
846 loop {
847 if let Ok(msg) = receiver_from_network.recv().await {
848 let mut state = send_state.write().await;
849
850 let mut result = state.send_handler(&msg).await;
851
852 while let Some(event) = result.pop() {
853 match event {
854 Either::Left(msg) => {
855 let _ = left_sender.broadcast(msg.into()).await;
856 },
857 Either::Right(msg) => {
858 let _ = right_sender.broadcast(msg.into()).await;
859 },
860 }
861 }
862 }
863 }
864 });
865
866 (network_task_sender, network_task_receiver)
867 }
868
869 #[allow(clippy::too_many_arguments)]
870 async fn spawn_twin_handles(
874 &'static mut self,
875 public_key: TYPES::SignatureKey,
876 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
877 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
878 nonce: u64,
879 config: HotShotConfig<TYPES>,
880 upgrade: versions::Upgrade,
881 memberships: EpochMembershipCoordinator<TYPES>,
882 network: Arc<I::Network>,
883 initializer: HotShotInitializer<TYPES>,
884 consensus_metrics: ConsensusMetricsValue,
885 storage: I::Storage,
886 storage_metrics: StorageMetricsValue,
887 ) -> (SystemContextHandle<TYPES, I>, SystemContextHandle<TYPES, I>) {
888 let epoch_height = config.epoch_height;
889 let left_system_context = SystemContext::new(
890 public_key.clone(),
891 private_key.clone(),
892 state_private_key.clone(),
893 nonce,
894 config.clone(),
895 upgrade,
896 memberships.clone(),
897 Arc::clone(&network),
898 initializer.clone(),
899 consensus_metrics.clone(),
900 storage.clone(),
901 storage_metrics.clone(),
902 )
903 .await;
904 let right_system_context = SystemContext::new(
905 public_key,
906 private_key,
907 state_private_key,
908 nonce,
909 config,
910 upgrade,
911 memberships,
912 network,
913 initializer,
914 consensus_metrics,
915 storage,
916 storage_metrics,
917 )
918 .await;
919
920 let left_consensus_registry = ConsensusTaskRegistry::new();
922 let left_network_registry = NetworkTaskRegistry::new();
923
924 let right_consensus_registry = ConsensusTaskRegistry::new();
925 let right_network_registry = NetworkTaskRegistry::new();
926
927 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
929 let left_external_event_stream =
930 (left_external_sender, left_external_receiver.deactivate());
931
932 let (right_external_sender, right_external_receiver) =
933 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
934 let right_external_event_stream =
935 (right_external_sender, right_external_receiver.deactivate());
936
937 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
939 let left_internal_event_stream = (
940 left_internal_sender.clone(),
941 left_internal_receiver.clone().deactivate(),
942 );
943
944 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
945 let right_internal_event_stream = (
946 right_internal_sender.clone(),
947 right_internal_receiver.clone().deactivate(),
948 );
949
950 let mut left_handle = SystemContextHandle::<_, I> {
952 consensus_registry: left_consensus_registry,
953 network_registry: left_network_registry,
954 output_event_stream: left_external_event_stream.clone(),
955 internal_event_stream: left_internal_event_stream.clone(),
956 hotshot: Arc::clone(&left_system_context),
957 storage: left_system_context.storage.clone(),
958 network: Arc::clone(&left_system_context.network),
959 membership_coordinator: left_system_context.membership_coordinator.clone(),
960 epoch_height,
961 };
962
963 let mut right_handle = SystemContextHandle::<_, I> {
964 consensus_registry: right_consensus_registry,
965 network_registry: right_network_registry,
966 output_event_stream: right_external_event_stream.clone(),
967 internal_event_stream: right_internal_event_stream.clone(),
968 hotshot: Arc::clone(&right_system_context),
969 storage: right_system_context.storage.clone(),
970 network: Arc::clone(&right_system_context.network),
971 membership_coordinator: right_system_context.membership_coordinator.clone(),
972 epoch_height,
973 };
974
975 add_consensus_tasks::<TYPES, I>(&mut left_handle).await;
977 add_consensus_tasks::<TYPES, I>(&mut right_handle).await;
978
979 let fused_internal_event_stream = self.fuse_channels(
981 (left_internal_sender, left_internal_receiver),
982 (right_internal_sender, right_internal_receiver),
983 );
984
985 left_handle.internal_event_stream = (
987 fused_internal_event_stream.0,
988 fused_internal_event_stream.1.deactivate(),
989 );
990
991 add_network_tasks::<TYPES, I>(&mut left_handle).await;
993
994 left_handle.internal_event_stream = left_internal_event_stream.clone();
996
997 (left_handle, right_handle)
998 }
999}
1000
1001#[derive(Debug)]
1002pub struct RandomTwinsHandler;
1005
1006#[async_trait]
1007impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1008 for RandomTwinsHandler
1009{
1010 async fn send_handler(
1011 &mut self,
1012 event: &HotShotEvent<TYPES>,
1013 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1014 let random: bool = rand::thread_rng().r#gen();
1015
1016 #[allow(clippy::match_bool)]
1017 match random {
1018 true => vec![Either::Left(event.clone())],
1019 false => vec![Either::Right(event.clone())],
1020 }
1021 }
1022
1023 async fn recv_handler(
1024 &mut self,
1025 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1026 ) -> Vec<HotShotEvent<TYPES>> {
1027 match event {
1028 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1029 }
1030 }
1031}
1032
1033#[derive(Debug)]
1036pub struct DoubleTwinsHandler;
1037
1038#[async_trait]
1039impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1040 for DoubleTwinsHandler
1041{
1042 async fn send_handler(
1043 &mut self,
1044 event: &HotShotEvent<TYPES>,
1045 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1046 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1047 }
1048
1049 async fn recv_handler(
1050 &mut self,
1051 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1052 ) -> Vec<HotShotEvent<TYPES>> {
1053 match event {
1054 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1055 }
1056 }
1057}
1058
1059#[async_trait]
1060impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusApi<TYPES, I>
1061 for SystemContextHandle<TYPES, I>
1062{
1063 fn total_nodes(&self) -> NonZeroUsize {
1064 self.hotshot.config.num_nodes_with_stake
1065 }
1066
1067 fn builder_timeout(&self) -> Duration {
1068 self.hotshot.config.builder_timeout
1069 }
1070
1071 async fn send_event(&self, event: Event<TYPES>) {
1072 debug!(?event, "send_event");
1073 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1074 }
1075
1076 fn public_key(&self) -> &TYPES::SignatureKey {
1077 &self.hotshot.public_key
1078 }
1079
1080 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1081 &self.hotshot.private_key
1082 }
1083
1084 fn state_private_key(
1085 &self,
1086 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1087 &self.hotshot.state_private_key
1088 }
1089}
1090
1091#[derive(Clone, Debug, PartialEq)]
1092pub struct InitializerEpochInfo<TYPES: NodeType> {
1093 pub epoch: EpochNumber,
1094 pub drb_result: DrbResult,
1095 pub block_header: Option<TYPES::BlockHeader>,
1097}
1098
1099#[derive(Clone, Debug)]
1101pub struct HotShotInitializer<TYPES: NodeType> {
1102 pub instance_state: TYPES::InstanceState,
1104
1105 pub epoch_height: u64,
1107
1108 pub epoch_start_block: u64,
1110
1111 pub anchor_leaf: Leaf2<TYPES>,
1113
1114 pub anchor_state: Arc<TYPES::ValidatedState>,
1116
1117 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1119
1120 pub start_view: ViewNumber,
1122
1123 pub last_actioned_view: ViewNumber,
1126
1127 pub start_epoch: Option<EpochNumber>,
1129
1130 pub high_qc: QuorumCertificate2<TYPES>,
1134
1135 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1137
1138 pub saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1140
1141 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1143
1144 pub undecided_leaves: BTreeMap<ViewNumber, Leaf2<TYPES>>,
1147
1148 pub undecided_state: BTreeMap<ViewNumber, View<TYPES>>,
1150
1151 pub saved_vid_shares: VidShares<TYPES>,
1153
1154 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1156
1157 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1159}
1160
1161impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1162 pub async fn from_genesis(
1166 instance_state: TYPES::InstanceState,
1167 epoch_height: u64,
1168 epoch_start_block: u64,
1169 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1170 upgrade: Upgrade,
1171 ) -> Result<Self, HotShotError<TYPES>> {
1172 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1173 let high_qc = QuorumCertificate2::genesis(&validated_state, &instance_state, upgrade).await;
1174
1175 Ok(Self {
1176 anchor_leaf: Leaf2::genesis(&validated_state, &instance_state, upgrade.base).await,
1177 anchor_state: Arc::new(validated_state),
1178 anchor_state_delta: Some(Arc::new(state_delta)),
1179 start_view: ViewNumber::new(0),
1180 start_epoch: genesis_epoch_from_version(upgrade.base),
1181 last_actioned_view: ViewNumber::new(0),
1182 saved_proposals: BTreeMap::new(),
1183 high_qc,
1184 next_epoch_high_qc: None,
1185 decided_upgrade_certificate: None,
1186 undecided_leaves: BTreeMap::new(),
1187 undecided_state: BTreeMap::new(),
1188 instance_state,
1189 saved_vid_shares: BTreeMap::new(),
1190 epoch_height,
1191 state_cert: None,
1192 epoch_start_block,
1193 start_epoch_info,
1194 })
1195 }
1196
1197 #[must_use]
1199 pub fn update_undecided(self) -> Self {
1200 let mut undecided_leaves = self.undecided_leaves.clone();
1201 let mut undecided_state = self.undecided_state.clone();
1202
1203 for proposal in self.saved_proposals.values() {
1204 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1206 continue;
1207 }
1208
1209 undecided_leaves.insert(
1210 proposal.data.view_number(),
1211 Leaf2::from_quorum_proposal(&proposal.data),
1212 );
1213 }
1214
1215 for leaf in undecided_leaves.values() {
1216 let view_inner = ViewInner::Leaf {
1217 leaf: leaf.commit(),
1218 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1219 delta: None,
1220 epoch: leaf.epoch(self.epoch_height),
1221 };
1222 let view = View { view_inner };
1223
1224 undecided_state.insert(leaf.view_number(), view);
1225 }
1226
1227 Self {
1228 undecided_leaves,
1229 undecided_state,
1230 ..self
1231 }
1232 }
1233
1234 #[allow(clippy::too_many_arguments)]
1242 pub fn load(
1243 instance_state: TYPES::InstanceState,
1244 epoch_height: u64,
1245 epoch_start_block: u64,
1246 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1247 anchor_leaf: Leaf2<TYPES>,
1248 (start_view, start_epoch): (ViewNumber, Option<EpochNumber>),
1249 (high_qc, next_epoch_high_qc): (
1250 QuorumCertificate2<TYPES>,
1251 Option<NextEpochQuorumCertificate2<TYPES>>,
1252 ),
1253 last_actioned_view: ViewNumber,
1254 saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1255 saved_vid_shares: VidShares<TYPES>,
1256 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1257 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1258 ) -> Self {
1259 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1260 anchor_leaf.block_header(),
1261 ));
1262 let anchor_state_delta = None;
1263
1264 let initializer = Self {
1265 instance_state,
1266 epoch_height,
1267 epoch_start_block,
1268 anchor_leaf,
1269 anchor_state,
1270 anchor_state_delta,
1271 high_qc,
1272 start_view,
1273 start_epoch,
1274 last_actioned_view,
1275 saved_proposals,
1276 saved_vid_shares,
1277 next_epoch_high_qc,
1278 decided_upgrade_certificate,
1279 undecided_leaves: BTreeMap::new(),
1280 undecided_state: BTreeMap::new(),
1281 state_cert,
1282 start_epoch_info,
1283 };
1284
1285 initializer.update_undecided()
1286 }
1287}
1288
1289async fn load_start_epoch_info<TYPES: NodeType>(
1290 membership: &TYPES::Membership,
1291 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1292 epoch_height: u64,
1293 epoch_start_block: u64,
1294) {
1295 let first_epoch_number =
1296 EpochNumber::new(epoch_from_block_number(epoch_start_block, epoch_height));
1297
1298 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1299 membership.set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1300
1301 let mut sorted_epoch_info = start_epoch_info.clone();
1302 sorted_epoch_info.sort_by_key(|info| info.epoch);
1303 for epoch_info in sorted_epoch_info {
1304 if let Some(block_header) = &epoch_info.block_header {
1305 tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1306
1307 membership
1308 .add_epoch_root(block_header.clone())
1309 .await
1310 .unwrap_or_else(|err| {
1311 tracing::error!(
1313 "Failed to add epoch root for epoch {}: {err}",
1314 epoch_info.epoch
1315 );
1316 });
1317 }
1318 }
1319
1320 for epoch_info in start_epoch_info {
1321 tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1322 membership.add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1323 }
1324}