1use std::{
2 collections::{BTreeSet, HashMap, HashSet},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11use sha2::{Digest, Sha256};
12use versions::DRB_FIX_VERSION;
13
14use crate::{
15 PeerConfig,
16 data::{EpochNumber, Leaf2, ViewNumber},
17 drb::{DrbDifficultySelectorFn, DrbInput, DrbResult, compute_drb_result},
18 event::Event,
19 stake_table::HSStakeTable,
20 traits::{
21 block_contents::BlockHeader,
22 election::Membership,
23 node_implementation::NodeType,
24 storage::{
25 LoadDrbProgressFn, Storage, StoreDrbProgressFn, StoreDrbResultFn, load_drb_progress_fn,
26 store_drb_progress_fn, store_drb_result_fn,
27 },
28 },
29};
30
31type EpochMap<TYPES> = HashMap<EpochNumber, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33type DrbMap = HashSet<EpochNumber>;
34
35type EpochSender<TYPES> = (EpochNumber, Sender<Result<EpochMembership<TYPES>>>);
36
37pub struct EpochMembershipCoordinator<TYPES: NodeType> {
39 membership: Arc<RwLock<TYPES::Membership>>,
41
42 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
47
48 drb_calculation_map: Arc<Mutex<DrbMap>>,
49
50 pub epoch_height: u64,
52
53 store_drb_progress_fn: StoreDrbProgressFn,
54
55 load_drb_progress_fn: LoadDrbProgressFn,
56
57 store_drb_result_fn: StoreDrbResultFn,
59
60 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn>>>,
62}
63
64impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
65 fn clone(&self) -> Self {
66 Self {
67 membership: Arc::clone(&self.membership),
68 catchup_map: Arc::clone(&self.catchup_map),
69 drb_calculation_map: Arc::clone(&self.drb_calculation_map),
70 epoch_height: self.epoch_height,
71 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
72 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
73 store_drb_result_fn: self.store_drb_result_fn.clone(),
74 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
75 }
76 }
77}
78
79impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
80where
81 Self: Send,
82{
83 pub fn new<S: Storage<TYPES>>(
85 membership: Arc<RwLock<TYPES::Membership>>,
86 epoch_height: u64,
87 storage: &S,
88 ) -> Self {
89 Self {
90 membership,
91 catchup_map: Arc::default(),
92 drb_calculation_map: Arc::default(),
93 epoch_height,
94 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
95 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
96 store_drb_result_fn: store_drb_result_fn(storage.clone()),
97 drb_difficulty_selector: Arc::new(RwLock::new(None)),
98 }
99 }
100
101 pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
102 self.membership
103 .write()
104 .await
105 .set_external_channel(external_channel)
106 .await;
107 }
108
109 #[must_use]
111 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
112 &self.membership
113 }
114
115 pub async fn set_drb_difficulty_selector(
117 &self,
118 drb_difficulty_selector: DrbDifficultySelectorFn,
119 ) {
120 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
121
122 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
123 }
124
125 pub async fn membership_for_epoch(
128 &self,
129 maybe_epoch: Option<EpochNumber>,
130 ) -> Result<EpochMembership<TYPES>> {
131 let ret_val = EpochMembership {
132 epoch: maybe_epoch,
133 coordinator: self.clone(),
134 };
135 let Some(epoch) = maybe_epoch else {
136 return Ok(ret_val);
137 };
138 if self
139 .membership
140 .read()
141 .await
142 .has_randomized_stake_table(epoch)
143 .map_err(|e| {
144 error!(
145 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
146 {e}"
147 )
148 })?
149 {
150 return Ok(ret_val);
151 }
152 if self.catchup_map.lock().await.contains_key(&epoch) {
153 return Err(warn!(
154 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
155 progress"
156 ));
157 }
158 let coordinator = self.clone();
159 let (tx, rx) = broadcast(1);
160 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
161 spawn_catchup(coordinator, epoch, tx);
162
163 Err(warn!(
164 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
165 ))
166 }
167
168 pub async fn stake_table_for_epoch(
171 &self,
172 maybe_epoch: Option<EpochNumber>,
173 ) -> Result<EpochMembership<TYPES>> {
174 let ret_val = EpochMembership {
175 epoch: maybe_epoch,
176 coordinator: self.clone(),
177 };
178 let Some(epoch) = maybe_epoch else {
179 return Ok(ret_val);
180 };
181 if self.membership.read().await.has_stake_table(epoch) {
182 return Ok(ret_val);
183 }
184 if self.catchup_map.lock().await.contains_key(&epoch) {
185 return Err(warn!(
186 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
187 ));
188 }
189 let coordinator = self.clone();
190 let (tx, rx) = broadcast(1);
191 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
192 spawn_catchup(coordinator, epoch, tx);
193
194 Err(warn!(
195 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
196 ))
197 }
198
199 async fn catchup(
209 mut self,
210 epoch: EpochNumber,
211 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
212 ) {
213 let mut fetch_epochs = vec![];
215
216 let mut try_epoch = EpochNumber::new(epoch.saturating_sub(1));
217 let maybe_first_epoch = self.membership.read().await.first_epoch();
218 let Some(first_epoch) = maybe_first_epoch else {
219 let err = anytrace::error!(
220 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
221 );
222 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
223 .await;
224 return;
225 };
226
227 loop {
229 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
230 if has_stake_table {
231 if try_epoch <= EpochNumber::new(epoch.saturating_sub(2)) {
233 break;
234 }
235 try_epoch = EpochNumber::new(try_epoch.saturating_sub(1));
236 } else {
237 if try_epoch <= first_epoch + 1 {
238 let err = anytrace::error!(
239 "We are trying to catchup to an epoch lower than the second epoch! This \
240 means the initial stake table is missing!"
241 );
242 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
243 .await;
244 return;
245 }
246 let mut map_lock = self.catchup_map.lock().await;
248 match map_lock
249 .get(&try_epoch)
250 .map(InactiveReceiver::activate_cloned)
251 {
252 Some(mut rx) => {
253 drop(map_lock);
255 if let Ok(Ok(_)) = rx.recv_direct().await {
256 break;
257 };
258 },
260 _ => {
261 let (mut tx, rx) = broadcast(1);
263 tx.set_overflow(true);
264 map_lock.insert(try_epoch, rx.deactivate());
265 drop(map_lock);
266 fetch_epochs.push((try_epoch, tx));
267 try_epoch = EpochNumber::new(try_epoch.saturating_sub(1));
268 },
269 }
270 };
271 }
272 let epochs = fetch_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>();
273 tracing::warn!("Fetching stake tables for epochs: {epochs:?}");
274
275 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
277 match self.fetch_stake_table(current_fetch_epoch).await {
278 Ok(_) => {},
279 Err(err) => {
280 fetch_epochs.push((current_fetch_epoch, tx));
281 self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
282 .await;
283 return;
284 },
285 };
286
287 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
289 epoch: Some(current_fetch_epoch),
290 coordinator: self.clone(),
291 })) {
292 tracing::warn!(
293 "The catchup channel for epoch {} was overflown, dropped message {:?}",
294 current_fetch_epoch,
295 res.map(|em| em.epoch)
296 );
297 }
298
299 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
301 }
302
303 let root_leaf = match self.fetch_stake_table(epoch).await {
304 Ok(root_leaf) => root_leaf,
305 Err(err) => {
306 tracing::error!("Failed to fetch stake table for epoch {epoch:?}: {err:?}");
307 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
308 .await;
309 return;
310 },
311 };
312
313 match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
314 self.membership.clone(),
315 epoch,
316 )
317 .await
318 {
319 Ok(drb_result) => {
320 tracing::warn!(
321 ?drb_result,
322 "DRB result for epoch {epoch:?} retrieved from peers. Updating membership."
323 );
324 self.membership
325 .write()
326 .await
327 .add_drb_result(epoch, drb_result);
328 },
329 Err(err) => {
330 tracing::warn!(
331 "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
332 epoch,
333 err
334 );
335
336 let result = self.compute_drb_result(epoch, root_leaf).await;
337
338 log!(result);
339
340 if let Err(err) = result {
341 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
342 .await;
343 }
344 },
345 };
346
347 if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
349 epoch: Some(epoch),
350 coordinator: self.clone(),
351 })) {
352 tracing::warn!(
353 "The catchup channel for epoch {} was overflown, dropped message {:?}",
354 epoch,
355 res.map(|em| em.epoch)
356 );
357 }
358
359 self.catchup_map.lock().await.remove(&epoch);
361 }
362
363 pub async fn wait_for_catchup(&self, epoch: EpochNumber) -> Result<EpochMembership<TYPES>> {
369 let maybe_receiver = self
370 .catchup_map
371 .lock()
372 .await
373 .get(&epoch)
374 .map(InactiveReceiver::activate_cloned);
375 let Some(mut rx) = maybe_receiver else {
376 if self.membership.read().await.has_stake_table(epoch) {
378 return Ok(EpochMembership {
379 epoch: Some(epoch),
380 coordinator: self.clone(),
381 });
382 }
383 return Err(anytrace::error!(
384 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
385 ));
386 };
387 let Ok(Ok(mem)) = rx.recv_direct().await else {
388 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
389 };
390 Ok(mem)
391 }
392
393 async fn catchup_cleanup(
400 &mut self,
401 req_epoch: EpochNumber,
402 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
403 mut cancel_epochs: Vec<EpochSender<TYPES>>,
404 err: Error,
405 ) {
406 cancel_epochs.push((req_epoch, epoch_tx));
408
409 tracing::error!(
410 "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
411 cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
412 );
413 let mut map_lock = self.catchup_map.lock().await;
414 for (epoch, _) in cancel_epochs.iter() {
415 map_lock.remove(epoch);
417 }
418 drop(map_lock);
419 for (cancel_epoch, tx) in cancel_epochs {
420 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
422 tracing::warn!(
423 "The catchup channel for epoch {} was overflown during cleanup, dropped \
424 message {:?}",
425 cancel_epoch,
426 res.map(|em| em.epoch)
427 );
428 }
429 }
430 }
431
432 async fn fetch_stake_table(&self, epoch: EpochNumber) -> Result<Leaf2<TYPES>> {
446 let root_epoch = EpochNumber::new(epoch.saturating_sub(2));
447 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
448 return Err(anytrace::error!(
449 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
450 epoch {root_epoch:?}. This should not happen"
451 ));
452 };
453
454 let Ok(root_leaf) = root_membership.get_epoch_root().await else {
457 return Err(anytrace::error!(
458 "get epoch root leaf failed for epoch {root_epoch:?}"
459 ));
460 };
461
462 Membership::add_epoch_root(
463 Arc::clone(&self.membership),
464 root_leaf.block_header().clone(),
465 )
466 .await
467 .map_err(|e| {
468 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
469 })?;
470
471 Ok(root_leaf)
472 }
473
474 pub async fn compute_drb_result(
475 &self,
476 epoch: EpochNumber,
477 root_leaf: Leaf2<TYPES>,
478 ) -> Result<DrbResult> {
479 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
480
481 if drb_calculation_map_lock.contains(&epoch) {
482 return Err(anytrace::debug!(
483 "DRB calculation for epoch {} already in progress",
484 epoch
485 ));
486 } else {
487 drb_calculation_map_lock.insert(epoch);
488 }
489
490 drop(drb_calculation_map_lock);
491
492 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
493 return Err(anytrace::error!(
494 "Failed to serialize the QC signature for leaf {root_leaf:?}"
495 ));
496 };
497
498 let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
499 else {
500 return Err(anytrace::error!(
501 "The DRB difficulty selector is missing from the epoch membership coordinator. \
502 This node will not be able to spawn any DRB calculation tasks from catchup."
503 ));
504 };
505
506 let drb_difficulty = drb_difficulty_selector(root_leaf.block_header().version()).await;
507
508 let mut drb_seed_input = [0u8; 32];
509
510 if root_leaf.block_header().version() >= DRB_FIX_VERSION {
511 drb_seed_input = Sha256::digest(&drb_seed_input_vec).into();
512 } else {
513 let len = drb_seed_input_vec.len().min(32);
514 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
515 }
516
517 let drb_input = DrbInput {
518 epoch: *epoch,
519 iteration: 0,
520 value: drb_seed_input,
521 difficulty_level: drb_difficulty,
522 };
523
524 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
525 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
526
527 let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
528
529 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
530 drb_calculation_map_lock.remove(&epoch);
531 drop(drb_calculation_map_lock);
532
533 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
534 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
535 tracing::warn!("Failed to add drb result to storage: {e}");
536 }
537 self.membership.write().await.add_drb_result(epoch, drb);
538
539 Ok(drb)
540 }
541}
542
543fn spawn_catchup<T: NodeType>(
544 coordinator: EpochMembershipCoordinator<T>,
545 epoch: EpochNumber,
546 epoch_tx: Sender<Result<EpochMembership<T>>>,
547) {
548 tokio::spawn(async move {
549 coordinator.clone().catchup(epoch, epoch_tx).await;
550 });
551}
552pub struct EpochMembership<TYPES: NodeType> {
555 pub epoch: Option<EpochNumber>,
557 pub coordinator: EpochMembershipCoordinator<TYPES>,
559}
560
561impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
562 fn clone(&self) -> Self {
563 Self {
564 coordinator: self.coordinator.clone(),
565 epoch: self.epoch,
566 }
567 }
568}
569
570impl<TYPES: NodeType> EpochMembership<TYPES> {
571 pub fn epoch(&self) -> Option<EpochNumber> {
573 self.epoch
574 }
575
576 pub async fn next_epoch(&self) -> Result<Self> {
578 ensure!(
579 self.epoch().is_some(),
580 "No next epoch because epoch is None"
581 );
582 self.coordinator
583 .membership_for_epoch(self.epoch.map(|e| e + 1))
584 .await
585 }
586 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
588 ensure!(
589 self.epoch().is_some(),
590 "No next epoch because epoch is None"
591 );
592 self.coordinator
593 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
594 .await
595 }
596 pub async fn get_new_epoch(&self, epoch: Option<EpochNumber>) -> Result<Self> {
597 self.coordinator.membership_for_epoch(epoch).await
598 }
599
600 async fn get_epoch_root(&self) -> anyhow::Result<Leaf2<TYPES>> {
602 let Some(epoch) = self.epoch else {
603 anyhow::bail!("Cannot get root for None epoch");
604 };
605 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
606 self.coordinator.membership.clone(),
607 epoch,
608 )
609 .await
610 }
611
612 pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
614 let Some(epoch) = self.epoch else {
615 return Err(anytrace::warn!("Cannot get drb for None epoch"));
616 };
617 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
618 self.coordinator.membership.clone(),
619 epoch,
620 )
621 .await
622 .wrap()
623 }
624
625 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
627 self.coordinator
628 .membership
629 .read()
630 .await
631 .stake_table(self.epoch)
632 }
633
634 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
636 self.coordinator
637 .membership
638 .read()
639 .await
640 .da_stake_table(self.epoch)
641 }
642
643 pub async fn committee_members(
645 &self,
646 view_number: ViewNumber,
647 ) -> BTreeSet<TYPES::SignatureKey> {
648 self.coordinator
649 .membership
650 .read()
651 .await
652 .committee_members(view_number, self.epoch)
653 }
654
655 pub async fn da_committee_members(
657 &self,
658 view_number: ViewNumber,
659 ) -> BTreeSet<TYPES::SignatureKey> {
660 self.coordinator
661 .membership
662 .read()
663 .await
664 .da_committee_members(view_number, self.epoch)
665 }
666
667 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
670 self.coordinator
671 .membership
672 .read()
673 .await
674 .stake(pub_key, self.epoch)
675 }
676
677 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
680 self.coordinator
681 .membership
682 .read()
683 .await
684 .da_stake(pub_key, self.epoch)
685 }
686
687 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
689 self.coordinator
690 .membership
691 .read()
692 .await
693 .has_stake(pub_key, self.epoch)
694 }
695
696 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
698 self.coordinator
699 .membership
700 .read()
701 .await
702 .has_da_stake(pub_key, self.epoch)
703 }
704
705 pub async fn leader(&self, view: ViewNumber) -> Result<TYPES::SignatureKey> {
713 self.coordinator
714 .membership
715 .read()
716 .await
717 .leader(view, self.epoch)
718 }
719
720 pub async fn lookup_leader(
728 &self,
729 view: ViewNumber,
730 ) -> std::result::Result<
731 TYPES::SignatureKey,
732 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
733 > {
734 self.coordinator
735 .membership
736 .read()
737 .await
738 .lookup_leader(view, self.epoch)
739 }
740
741 pub async fn total_nodes(&self) -> usize {
743 self.coordinator
744 .membership
745 .read()
746 .await
747 .total_nodes(self.epoch)
748 }
749
750 pub async fn da_total_nodes(&self) -> usize {
752 self.coordinator
753 .membership
754 .read()
755 .await
756 .da_total_nodes(self.epoch)
757 }
758
759 pub async fn success_threshold(&self) -> U256 {
761 self.coordinator
762 .membership
763 .read()
764 .await
765 .success_threshold(self.epoch)
766 }
767
768 pub async fn da_success_threshold(&self) -> U256 {
770 self.coordinator
771 .membership
772 .read()
773 .await
774 .da_success_threshold(self.epoch)
775 }
776
777 pub async fn failure_threshold(&self) -> U256 {
779 self.coordinator
780 .membership
781 .read()
782 .await
783 .failure_threshold(self.epoch)
784 }
785
786 pub async fn upgrade_threshold(&self) -> U256 {
788 self.coordinator
789 .membership
790 .read()
791 .await
792 .upgrade_threshold(self.epoch)
793 }
794
795 pub async fn add_drb_result(&self, drb_result: DrbResult) {
797 if let Some(epoch) = self.epoch() {
798 self.coordinator
799 .membership
800 .write()
801 .await
802 .add_drb_result(epoch, drb_result);
803 }
804 }
805 pub async fn stake_table_hash(
806 &self,
807 ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
808 self.coordinator
809 .membership
810 .read()
811 .await
812 .stake_table_hash(self.epoch?)
813 }
814}