hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap, HashSet},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11use sha2::{Digest, Sha256};
12use versions::DRB_FIX_VERSION;
13
14use crate::{
15    PeerConfig,
16    data::{EpochNumber, Leaf2, ViewNumber},
17    drb::{DrbDifficultySelectorFn, DrbInput, DrbResult, compute_drb_result},
18    event::Event,
19    stake_table::HSStakeTable,
20    traits::{
21        block_contents::BlockHeader,
22        election::Membership,
23        node_implementation::NodeType,
24        storage::{
25            LoadDrbProgressFn, Storage, StoreDrbProgressFn, StoreDrbResultFn, load_drb_progress_fn,
26            store_drb_progress_fn, store_drb_result_fn,
27        },
28    },
29};
30
31type EpochMap<TYPES> = HashMap<EpochNumber, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33type DrbMap = HashSet<EpochNumber>;
34
35type EpochSender<TYPES> = (EpochNumber, Sender<Result<EpochMembership<TYPES>>>);
36
37/// Struct to Coordinate membership catchup
38pub struct EpochMembershipCoordinator<TYPES: NodeType> {
39    /// The underlying membership
40    membership: Arc<RwLock<TYPES::Membership>>,
41
42    /// Any in progress attempts at catching up are stored in this map
43    /// Any new callers wantin an `EpochMembership` will await on the signal
44    /// alerting them the membership is ready.  The first caller for an epoch will
45    /// wait for the actual catchup and alert future callers when it's done
46    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
47
48    drb_calculation_map: Arc<Mutex<DrbMap>>,
49
50    /// Number of blocks in an epoch
51    pub epoch_height: u64,
52
53    store_drb_progress_fn: StoreDrbProgressFn,
54
55    load_drb_progress_fn: LoadDrbProgressFn,
56
57    /// Callback function to store a drb result in storage when one is calculated during catchup
58    store_drb_result_fn: StoreDrbResultFn,
59
60    /// Callback function to select a DRB difficulty based on the view number of the seed
61    pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn>>>,
62}
63
64impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
65    fn clone(&self) -> Self {
66        Self {
67            membership: Arc::clone(&self.membership),
68            catchup_map: Arc::clone(&self.catchup_map),
69            drb_calculation_map: Arc::clone(&self.drb_calculation_map),
70            epoch_height: self.epoch_height,
71            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
72            load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
73            store_drb_result_fn: self.store_drb_result_fn.clone(),
74            drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
75        }
76    }
77}
78
79impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
80where
81    Self: Send,
82{
83    /// Create an EpochMembershipCoordinator
84    pub fn new<S: Storage<TYPES>>(
85        membership: Arc<RwLock<TYPES::Membership>>,
86        epoch_height: u64,
87        storage: &S,
88    ) -> Self {
89        Self {
90            membership,
91            catchup_map: Arc::default(),
92            drb_calculation_map: Arc::default(),
93            epoch_height,
94            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
95            load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
96            store_drb_result_fn: store_drb_result_fn(storage.clone()),
97            drb_difficulty_selector: Arc::new(RwLock::new(None)),
98        }
99    }
100
101    pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
102        self.membership
103            .write()
104            .await
105            .set_external_channel(external_channel)
106            .await;
107    }
108
109    /// Get a reference to the membership
110    #[must_use]
111    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
112        &self.membership
113    }
114
115    /// Set the DRB difficulty selector
116    pub async fn set_drb_difficulty_selector(
117        &self,
118        drb_difficulty_selector: DrbDifficultySelectorFn,
119    ) {
120        let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
121
122        *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
123    }
124
125    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
126    /// table for the given Epoch
127    pub async fn membership_for_epoch(
128        &self,
129        maybe_epoch: Option<EpochNumber>,
130    ) -> Result<EpochMembership<TYPES>> {
131        let ret_val = EpochMembership {
132            epoch: maybe_epoch,
133            coordinator: self.clone(),
134        };
135        let Some(epoch) = maybe_epoch else {
136            return Ok(ret_val);
137        };
138        if self
139            .membership
140            .read()
141            .await
142            .has_randomized_stake_table(epoch)
143            .map_err(|e| {
144                error!(
145                    "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
146                     {e}"
147                )
148            })?
149        {
150            return Ok(ret_val);
151        }
152        if self.catchup_map.lock().await.contains_key(&epoch) {
153            return Err(warn!(
154                "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
155                 progress"
156            ));
157        }
158        let coordinator = self.clone();
159        let (tx, rx) = broadcast(1);
160        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
161        spawn_catchup(coordinator, epoch, tx);
162
163        Err(warn!(
164            "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
165        ))
166    }
167
168    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
169    /// table for the given Epoch
170    pub async fn stake_table_for_epoch(
171        &self,
172        maybe_epoch: Option<EpochNumber>,
173    ) -> Result<EpochMembership<TYPES>> {
174        let ret_val = EpochMembership {
175            epoch: maybe_epoch,
176            coordinator: self.clone(),
177        };
178        let Some(epoch) = maybe_epoch else {
179            return Ok(ret_val);
180        };
181        if self.membership.read().await.has_stake_table(epoch) {
182            return Ok(ret_val);
183        }
184        if self.catchup_map.lock().await.contains_key(&epoch) {
185            return Err(warn!(
186                "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
187            ));
188        }
189        let coordinator = self.clone();
190        let (tx, rx) = broadcast(1);
191        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
192        spawn_catchup(coordinator, epoch, tx);
193
194        Err(warn!(
195            "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
196        ))
197    }
198
199    /// Catches the membership up to the epoch passed as an argument.
200    /// To do this, try to get the stake table for the epoch containing this epoch's root and
201    /// the stake table for the epoch containing this epoch's drb result.
202    /// If they do not exist, then go one by one back until we find a stake table.
203    ///
204    /// If there is another catchup in progress this will not duplicate efforts
205    /// e.g. if we start with only the first epoch stake table and call catchup for epoch 10, then call catchup for epoch 20
206    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
207    /// catching up to epoch 20
208    async fn catchup(
209        mut self,
210        epoch: EpochNumber,
211        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
212    ) {
213        // We need to fetch the requested epoch, that's for sure
214        let mut fetch_epochs = vec![];
215
216        let mut try_epoch = EpochNumber::new(epoch.saturating_sub(1));
217        let maybe_first_epoch = self.membership.read().await.first_epoch();
218        let Some(first_epoch) = maybe_first_epoch else {
219            let err = anytrace::error!(
220                "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
221            );
222            self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
223                .await;
224            return;
225        };
226
227        // First figure out which epochs we need to fetch
228        loop {
229            let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
230            if has_stake_table {
231                // We have this stake table but we need to make sure we have the epoch root of the requested epoch
232                if try_epoch <= EpochNumber::new(epoch.saturating_sub(2)) {
233                    break;
234                }
235                try_epoch = EpochNumber::new(try_epoch.saturating_sub(1));
236            } else {
237                if try_epoch <= first_epoch + 1 {
238                    let err = anytrace::error!(
239                        "We are trying to catchup to an epoch lower than the second epoch! This \
240                         means the initial stake table is missing!"
241                    );
242                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
243                        .await;
244                    return;
245                }
246                // Lock the catchup map
247                let mut map_lock = self.catchup_map.lock().await;
248                match map_lock
249                    .get(&try_epoch)
250                    .map(InactiveReceiver::activate_cloned)
251                {
252                    Some(mut rx) => {
253                        // Somebody else is already fetching this epoch, drop the lock and wait for them to finish
254                        drop(map_lock);
255                        if let Ok(Ok(_)) = rx.recv_direct().await {
256                            break;
257                        };
258                        // If we didn't receive the epoch then we need to try again
259                    },
260                    _ => {
261                        // Nobody else is fetching this epoch. We need to do it. Put it in the map and move on to the next epoch
262                        let (mut tx, rx) = broadcast(1);
263                        tx.set_overflow(true);
264                        map_lock.insert(try_epoch, rx.deactivate());
265                        drop(map_lock);
266                        fetch_epochs.push((try_epoch, tx));
267                        try_epoch = EpochNumber::new(try_epoch.saturating_sub(1));
268                    },
269                }
270            };
271        }
272        let epochs = fetch_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>();
273        tracing::warn!("Fetching stake tables for epochs: {epochs:?}");
274
275        // Iterate through the epochs we need to fetch in reverse, i.e. from the oldest to the newest
276        while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
277            match self.fetch_stake_table(current_fetch_epoch).await {
278                Ok(_) => {},
279                Err(err) => {
280                    fetch_epochs.push((current_fetch_epoch, tx));
281                    self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
282                        .await;
283                    return;
284                },
285            };
286
287            // Signal the other tasks about the success
288            if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
289                epoch: Some(current_fetch_epoch),
290                coordinator: self.clone(),
291            })) {
292                tracing::warn!(
293                    "The catchup channel for epoch {} was overflown, dropped message {:?}",
294                    current_fetch_epoch,
295                    res.map(|em| em.epoch)
296                );
297            }
298
299            // Remove the epoch from the catchup map to indicate that the catchup is complete
300            self.catchup_map.lock().await.remove(&current_fetch_epoch);
301        }
302
303        let root_leaf = match self.fetch_stake_table(epoch).await {
304            Ok(root_leaf) => root_leaf,
305            Err(err) => {
306                tracing::error!("Failed to fetch stake table for epoch {epoch:?}: {err:?}");
307                self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
308                    .await;
309                return;
310            },
311        };
312
313        match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
314            self.membership.clone(),
315            epoch,
316        )
317        .await
318        {
319            Ok(drb_result) => {
320                tracing::warn!(
321                    ?drb_result,
322                    "DRB result for epoch {epoch:?} retrieved from peers. Updating membership."
323                );
324                self.membership
325                    .write()
326                    .await
327                    .add_drb_result(epoch, drb_result);
328            },
329            Err(err) => {
330                tracing::warn!(
331                    "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
332                    epoch,
333                    err
334                );
335
336                let result = self.compute_drb_result(epoch, root_leaf).await;
337
338                log!(result);
339
340                if let Err(err) = result {
341                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
342                        .await;
343                }
344            },
345        };
346
347        // Signal the other tasks about the success
348        if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
349            epoch: Some(epoch),
350            coordinator: self.clone(),
351        })) {
352            tracing::warn!(
353                "The catchup channel for epoch {} was overflown, dropped message {:?}",
354                epoch,
355                res.map(|em| em.epoch)
356            );
357        }
358
359        // Remove the epoch from the catchup map to indicate that the catchup is complete
360        self.catchup_map.lock().await.remove(&epoch);
361    }
362
363    /// Call this method if you think catchup is in progress for a given epoch
364    /// and you want to wait for it to finish and get the stake table.
365    /// If it's not, it will try to return the stake table if already available.
366    /// Returns an error if the catchup failed or the catchup is not in progress
367    /// and the stake table is not available.
368    pub async fn wait_for_catchup(&self, epoch: EpochNumber) -> Result<EpochMembership<TYPES>> {
369        let maybe_receiver = self
370            .catchup_map
371            .lock()
372            .await
373            .get(&epoch)
374            .map(InactiveReceiver::activate_cloned);
375        let Some(mut rx) = maybe_receiver else {
376            // There is no catchup in progress, maybe the epoch is already finalized
377            if self.membership.read().await.has_stake_table(epoch) {
378                return Ok(EpochMembership {
379                    epoch: Some(epoch),
380                    coordinator: self.clone(),
381                });
382            }
383            return Err(anytrace::error!(
384                "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
385            ));
386        };
387        let Ok(Ok(mem)) = rx.recv_direct().await else {
388            return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
389        };
390        Ok(mem)
391    }
392
393    /// Clean up after a failed catchup attempt.
394    ///
395    /// This method is called when a catchup attempt fails. It cleans up the state of the
396    /// `EpochMembershipCoordinator` by removing the failed epochs from the
397    /// `catchup_map` and broadcasting the error to any tasks that are waiting for the
398    /// catchup to complete.
399    async fn catchup_cleanup(
400        &mut self,
401        req_epoch: EpochNumber,
402        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
403        mut cancel_epochs: Vec<EpochSender<TYPES>>,
404        err: Error,
405    ) {
406        // Cleanup in case of error
407        cancel_epochs.push((req_epoch, epoch_tx));
408
409        tracing::error!(
410            "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
411            cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
412        );
413        let mut map_lock = self.catchup_map.lock().await;
414        for (epoch, _) in cancel_epochs.iter() {
415            // Remove the failed epochs from the catchup map
416            map_lock.remove(epoch);
417        }
418        drop(map_lock);
419        for (cancel_epoch, tx) in cancel_epochs {
420            // Signal the other tasks about the failures
421            if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
422                tracing::warn!(
423                    "The catchup channel for epoch {} was overflown during cleanup, dropped \
424                     message {:?}",
425                    cancel_epoch,
426                    res.map(|em| em.epoch)
427                );
428            }
429        }
430    }
431
432    /// A helper method to the `catchup` method.
433    ///
434    /// It tries to fetch the requested stake table from the root epoch,
435    /// and updates the membership accordingly.
436    ///
437    /// # Arguments
438    ///
439    /// * `epoch` - The epoch for which to fetch the stake table.
440    ///
441    /// # Returns
442    ///
443    /// * `Ok(Leaf2<TYPES>)` containing the epoch root leaf if successful.
444    /// * `Err(Error)` if the root membership or root leaf cannot be found, or if updating the membership fails.
445    async fn fetch_stake_table(&self, epoch: EpochNumber) -> Result<Leaf2<TYPES>> {
446        let root_epoch = EpochNumber::new(epoch.saturating_sub(2));
447        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
448            return Err(anytrace::error!(
449                "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
450                 epoch {root_epoch:?}. This should not happen"
451            ));
452        };
453
454        // Get the epoch root headers and update our membership with them, finally sync them
455        // Verification of the root is handled in get_epoch_root_and_drb
456        let Ok(root_leaf) = root_membership.get_epoch_root().await else {
457            return Err(anytrace::error!(
458                "get epoch root leaf failed for epoch {root_epoch:?}"
459            ));
460        };
461
462        Membership::add_epoch_root(
463            Arc::clone(&self.membership),
464            root_leaf.block_header().clone(),
465        )
466        .await
467        .map_err(|e| {
468            anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
469        })?;
470
471        Ok(root_leaf)
472    }
473
474    pub async fn compute_drb_result(
475        &self,
476        epoch: EpochNumber,
477        root_leaf: Leaf2<TYPES>,
478    ) -> Result<DrbResult> {
479        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
480
481        if drb_calculation_map_lock.contains(&epoch) {
482            return Err(anytrace::debug!(
483                "DRB calculation for epoch {} already in progress",
484                epoch
485            ));
486        } else {
487            drb_calculation_map_lock.insert(epoch);
488        }
489
490        drop(drb_calculation_map_lock);
491
492        let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
493            return Err(anytrace::error!(
494                "Failed to serialize the QC signature for leaf {root_leaf:?}"
495            ));
496        };
497
498        let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
499        else {
500            return Err(anytrace::error!(
501                "The DRB difficulty selector is missing from the epoch membership coordinator. \
502                 This node will not be able to spawn any DRB calculation tasks from catchup."
503            ));
504        };
505
506        let drb_difficulty = drb_difficulty_selector(root_leaf.block_header().version()).await;
507
508        let mut drb_seed_input = [0u8; 32];
509
510        if root_leaf.block_header().version() >= DRB_FIX_VERSION {
511            drb_seed_input = Sha256::digest(&drb_seed_input_vec).into();
512        } else {
513            let len = drb_seed_input_vec.len().min(32);
514            drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
515        }
516
517        let drb_input = DrbInput {
518            epoch: *epoch,
519            iteration: 0,
520            value: drb_seed_input,
521            difficulty_level: drb_difficulty,
522        };
523
524        let store_drb_progress_fn = self.store_drb_progress_fn.clone();
525        let load_drb_progress_fn = self.load_drb_progress_fn.clone();
526
527        let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
528
529        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
530        drb_calculation_map_lock.remove(&epoch);
531        drop(drb_calculation_map_lock);
532
533        tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
534        if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
535            tracing::warn!("Failed to add drb result to storage: {e}");
536        }
537        self.membership.write().await.add_drb_result(epoch, drb);
538
539        Ok(drb)
540    }
541}
542
543fn spawn_catchup<T: NodeType>(
544    coordinator: EpochMembershipCoordinator<T>,
545    epoch: EpochNumber,
546    epoch_tx: Sender<Result<EpochMembership<T>>>,
547) {
548    tokio::spawn(async move {
549        coordinator.clone().catchup(epoch, epoch_tx).await;
550    });
551}
552/// Wrapper around a membership that guarantees that the epoch
553/// has a stake table
554pub struct EpochMembership<TYPES: NodeType> {
555    /// Epoch the `membership` is guaranteed to have a stake table for
556    pub epoch: Option<EpochNumber>,
557    /// Underlying membership
558    pub coordinator: EpochMembershipCoordinator<TYPES>,
559}
560
561impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
562    fn clone(&self) -> Self {
563        Self {
564            coordinator: self.coordinator.clone(),
565            epoch: self.epoch,
566        }
567    }
568}
569
570impl<TYPES: NodeType> EpochMembership<TYPES> {
571    /// Get the epoch this membership is good for
572    pub fn epoch(&self) -> Option<EpochNumber> {
573        self.epoch
574    }
575
576    /// Get a membership for the next epoch
577    pub async fn next_epoch(&self) -> Result<Self> {
578        ensure!(
579            self.epoch().is_some(),
580            "No next epoch because epoch is None"
581        );
582        self.coordinator
583            .membership_for_epoch(self.epoch.map(|e| e + 1))
584            .await
585    }
586    /// Get a membership for the next epoch
587    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
588        ensure!(
589            self.epoch().is_some(),
590            "No next epoch because epoch is None"
591        );
592        self.coordinator
593            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
594            .await
595    }
596    pub async fn get_new_epoch(&self, epoch: Option<EpochNumber>) -> Result<Self> {
597        self.coordinator.membership_for_epoch(epoch).await
598    }
599
600    /// Wraps the same named Membership trait fn
601    async fn get_epoch_root(&self) -> anyhow::Result<Leaf2<TYPES>> {
602        let Some(epoch) = self.epoch else {
603            anyhow::bail!("Cannot get root for None epoch");
604        };
605        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
606            self.coordinator.membership.clone(),
607            epoch,
608        )
609        .await
610    }
611
612    /// Wraps the same named Membership trait fn
613    pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
614        let Some(epoch) = self.epoch else {
615            return Err(anytrace::warn!("Cannot get drb for None epoch"));
616        };
617        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
618            self.coordinator.membership.clone(),
619            epoch,
620        )
621        .await
622        .wrap()
623    }
624
625    /// Get all participants in the committee (including their stake) for a specific epoch
626    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
627        self.coordinator
628            .membership
629            .read()
630            .await
631            .stake_table(self.epoch)
632    }
633
634    /// Get all participants in the committee (including their stake) for a specific epoch
635    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
636        self.coordinator
637            .membership
638            .read()
639            .await
640            .da_stake_table(self.epoch)
641    }
642
643    /// Get all participants in the committee for a specific view for a specific epoch
644    pub async fn committee_members(
645        &self,
646        view_number: ViewNumber,
647    ) -> BTreeSet<TYPES::SignatureKey> {
648        self.coordinator
649            .membership
650            .read()
651            .await
652            .committee_members(view_number, self.epoch)
653    }
654
655    /// Get all participants in the committee for a specific view for a specific epoch
656    pub async fn da_committee_members(
657        &self,
658        view_number: ViewNumber,
659    ) -> BTreeSet<TYPES::SignatureKey> {
660        self.coordinator
661            .membership
662            .read()
663            .await
664            .da_committee_members(view_number, self.epoch)
665    }
666
667    /// Get the stake table entry for a public key, returns `None` if the
668    /// key is not in the table for a specific epoch
669    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
670        self.coordinator
671            .membership
672            .read()
673            .await
674            .stake(pub_key, self.epoch)
675    }
676
677    /// Get the DA stake table entry for a public key, returns `None` if the
678    /// key is not in the table for a specific epoch
679    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
680        self.coordinator
681            .membership
682            .read()
683            .await
684            .da_stake(pub_key, self.epoch)
685    }
686
687    /// See if a node has stake in the committee in a specific epoch
688    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
689        self.coordinator
690            .membership
691            .read()
692            .await
693            .has_stake(pub_key, self.epoch)
694    }
695
696    /// See if a node has stake in the committee in a specific epoch
697    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
698        self.coordinator
699            .membership
700            .read()
701            .await
702            .has_da_stake(pub_key, self.epoch)
703    }
704
705    /// The leader of the committee for view `view_number` in `epoch`.
706    ///
707    /// Note: this function uses a HotShot-internal error type.
708    /// You should implement `lookup_leader`, rather than implementing this function directly.
709    ///
710    /// # Errors
711    /// Returns an error if the leader cannot be calculated.
712    pub async fn leader(&self, view: ViewNumber) -> Result<TYPES::SignatureKey> {
713        self.coordinator
714            .membership
715            .read()
716            .await
717            .leader(view, self.epoch)
718    }
719
720    /// The leader of the committee for view `view_number` in `epoch`.
721    ///
722    /// Note: There is no such thing as a DA leader, so any consumer
723    /// requiring a leader should call this.
724    ///
725    /// # Errors
726    /// Returns an error if the leader cannot be calculated
727    pub async fn lookup_leader(
728        &self,
729        view: ViewNumber,
730    ) -> std::result::Result<
731        TYPES::SignatureKey,
732        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
733    > {
734        self.coordinator
735            .membership
736            .read()
737            .await
738            .lookup_leader(view, self.epoch)
739    }
740
741    /// Returns the number of total nodes in the committee in an epoch `epoch`
742    pub async fn total_nodes(&self) -> usize {
743        self.coordinator
744            .membership
745            .read()
746            .await
747            .total_nodes(self.epoch)
748    }
749
750    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
751    pub async fn da_total_nodes(&self) -> usize {
752        self.coordinator
753            .membership
754            .read()
755            .await
756            .da_total_nodes(self.epoch)
757    }
758
759    /// Returns the threshold for a specific `Membership` implementation
760    pub async fn success_threshold(&self) -> U256 {
761        self.coordinator
762            .membership
763            .read()
764            .await
765            .success_threshold(self.epoch)
766    }
767
768    /// Returns the DA threshold for a specific `Membership` implementation
769    pub async fn da_success_threshold(&self) -> U256 {
770        self.coordinator
771            .membership
772            .read()
773            .await
774            .da_success_threshold(self.epoch)
775    }
776
777    /// Returns the threshold for a specific `Membership` implementation
778    pub async fn failure_threshold(&self) -> U256 {
779        self.coordinator
780            .membership
781            .read()
782            .await
783            .failure_threshold(self.epoch)
784    }
785
786    /// Returns the threshold required to upgrade the network protocol
787    pub async fn upgrade_threshold(&self) -> U256 {
788        self.coordinator
789            .membership
790            .read()
791            .await
792            .upgrade_threshold(self.epoch)
793    }
794
795    /// Add the epoch result to the membership
796    pub async fn add_drb_result(&self, drb_result: DrbResult) {
797        if let Some(epoch) = self.epoch() {
798            self.coordinator
799                .membership
800                .write()
801                .await
802                .add_drb_result(epoch, drb_result);
803        }
804    }
805    pub async fn stake_table_hash(
806        &self,
807    ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
808        self.coordinator
809            .membership
810            .read()
811            .await
812            .stake_table_hash(self.epoch?)
813    }
814}