Skip to main content

espresso_types/v0/impls/
committee.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    ops::Bound,
4    sync::Arc,
5};
6
7use alloy::primitives::{Address, U256};
8use anyhow::{Context, bail};
9use async_lock::Mutex as AsyncMutex;
10use committable::Commitment;
11use hotshot::types::{BLSPubKey, SignatureKey as _};
12use hotshot_types::{
13    PeerConfig, PeerConnectInfo,
14    data::{BlockNumber, EpochNumber, ViewNumber},
15    drb::{
16        DrbResult,
17        election::{RandomizedCommittee, generate_stake_cdf, select_randomized_leader},
18    },
19    epoch_membership::EpochMembershipCoordinator,
20    stake_table::{HSStakeTable, StakeTableEntry},
21    traits::{
22        block_contents::BlockHeader,
23        election::{Membership, MembershipSnapshot, NonEpochMembershipSnapshot},
24        signature_key::StakeTableEntryType,
25    },
26    utils::{
27        epoch_from_block_number, is_epoch_root, root_block_in_epoch, transition_block_for_epoch,
28    },
29};
30use indexmap::IndexMap;
31use parking_lot::RwLock;
32use thiserror::Error;
33use tracing::{debug, error, info, warn};
34use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
35
36use super::{
37    AuthenticatedValidatorMap, RegisteredValidatorMap, StakeTableHash, StakeTableState,
38    compute_block_reward,
39};
40use crate::{
41    Header, Leaf2, PubKey, SeqTypes,
42    traits::StateCatchup,
43    v0_3::{ASSUMED_BLOCK_TIME_SECONDS, AuthenticatedValidator, Fetcher, RewardAmount},
44};
45
46/// Type to describe DA and Stake memberships.
47#[derive(Clone, Debug)]
48pub struct EpochCommittees {
49    inner: Arc<RwLock<Inner>>,
50    fetcher: Arc<Fetcher>,
51    update_fixed_block_reward_lock: Arc<AsyncMutex<()>>,
52    epoch_height: BlockNumber,
53}
54
55#[derive(Debug)]
56struct Inner {
57    /// Captured pre-epoch view.
58    ///
59    /// Built once at constructor time. `add_da_committee` operates on the
60    /// per-epoch `da_committees` map and does not modify the pre-epoch view.
61    non_epoch_snapshot: NonEpochSnapshot,
62
63    /// Per-epoch snapshots.
64    ///
65    /// Each mutator rebuilds the snapshot for its affected epoch(s).
66    snapshots: BTreeMap<EpochNumber, EpochSnapshot>,
67
68    /// Holds the full validator candidate sets temporarily, until we store them.
69    all_validators: BTreeMap<EpochNumber, RegisteredValidatorMap>,
70
71    /// DA committees, indexed by the first epoch in which they apply.
72    ///
73    /// Kept separate from `snapshots` because the lookup is a range query.
74    da_committees: BTreeMap<EpochNumber, Arc<DaCommittee>>,
75
76    first_epoch: Option<EpochNumber>,
77
78    /// Fixed block reward (used only in V3).
79    ///
80    /// Starting from V4, block reward is dynamic
81    fixed_block_reward: Option<RewardAmount>,
82}
83
84#[derive(Clone, Debug)]
85struct DaCommittee {
86    committee: Vec<PeerConfig<SeqTypes>>,
87    indexed_committee: HashMap<PubKey, PeerConfig<SeqTypes>>,
88}
89
90/// Pre-epoch stake-table state.
91#[derive(Debug)]
92struct NonEpochCommittee {
93    /// The nodes eligible for leadership.
94    ///
95    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
96    /// leader but without voting rights.
97    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
98
99    /// Keys for nodes participating in the network
100    stake_table: Vec<PeerConfig<SeqTypes>>,
101
102    /// Stake entries indexed by public key, for efficient lookup.
103    indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
104}
105
106/// Holds Stake table and da stake
107#[derive(Debug)]
108struct EpochCommittee {
109    /// The nodes eligible for leadership.
110    ///
111    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
112    /// leader but without voting rights.
113    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
114    /// Keys for nodes participating in the network
115    stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
116    validators: AuthenticatedValidatorMap,
117    address_mapping: HashMap<BLSPubKey, Address>,
118    block_reward: Option<RewardAmount>,
119    stake_table_hash: Option<StakeTableHash>,
120    header: Option<Header>,
121}
122
123impl EpochCommittee {
124    fn new(
125        validators: AuthenticatedValidatorMap,
126        block_reward: Option<RewardAmount>,
127        hash: Option<StakeTableHash>,
128        header: Option<Header>,
129    ) -> Self {
130        let mut address_mapping = HashMap::new();
131        let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
132            .values()
133            .map(|v| {
134                address_mapping.insert(v.stake_table_key, v.account);
135                (
136                    v.stake_table_key,
137                    PeerConfig {
138                        stake_table_entry: BLSPubKey::stake_table_entry(
139                            &v.stake_table_key,
140                            v.stake,
141                        ),
142                        state_ver_key: v.state_ver_key.clone(),
143                        connect_info: v.x25519_key.and_then(|p| {
144                            let a = v.p2p_addr.clone()?;
145                            Some(PeerConnectInfo {
146                                x25519_key: p,
147                                p2p_addr: a,
148                            })
149                        }),
150                    },
151                )
152            })
153            .collect();
154
155        let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
156            stake_table.iter().map(|(_, l)| l.clone()).collect();
157
158        Self {
159            eligible_leaders,
160            stake_table,
161            validators,
162            address_mapping,
163            block_reward,
164            stake_table_hash: hash,
165            header,
166        }
167    }
168}
169
170impl EpochCommittees {
171    pub fn epoch_height(&self) -> BlockNumber {
172        self.epoch_height
173    }
174
175    pub fn first_epoch(&self) -> Option<EpochNumber> {
176        self.inner.read().first_epoch
177    }
178
179    pub fn fetcher(&self) -> &Fetcher {
180        &self.fetcher
181    }
182
183    pub fn fixed_block_reward(&self) -> Option<RewardAmount> {
184        self.inner.read().fixed_block_reward
185    }
186
187    /// Find the most recent stake-table entry for `key`.
188    ///
189    /// Scanns loaded epochs from highest to lowest and falling back to the
190    /// genesis bootstrap committee.
191    pub fn latest_peer_config(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
192        let inner = self.inner.read();
193        for snap in inner.snapshots.values().rev() {
194            if let Some(cfg) = snap.inner.committee.stake_table.get(key) {
195                return Some(cfg.clone());
196            }
197        }
198        inner
199            .non_epoch_snapshot
200            .inner
201            .committee
202            .indexed_stake_table
203            .get(key)
204            .cloned()
205    }
206
207    /// Fetch the fixed block reward and update it if its None.
208    /// We used a fixed block reward for version v3
209    /// Version v4 uses the dynamic block reward
210    /// Assumes the stake table contract proxy address does not change
211    async fn fetch_and_update_fixed_block_reward(
212        &self,
213        epoch: EpochNumber,
214    ) -> anyhow::Result<RewardAmount> {
215        // Ensure there is only one `fetch_and_update_fixed_block_reward` at a time:
216        let _guard = self.update_fixed_block_reward_lock.lock().await;
217
218        // Clippy claims "temporary with significant `Drop` in `if let`
219        // scrutinee will live until the end of the `if let` expression",
220        // however this is incorrect. The 2024 edition changed the drop
221        // scope of `if-let` expressions:
222        //
223        // https://doc.rust-lang.org/edition-guide/rust-2024/temporary-if-let-scope.html
224        //
225        // The read guard is dropped before `else`.
226        #[allow(clippy::significant_drop_in_scrutinee)]
227        if let Some(reward) = self.inner.read().fixed_block_reward {
228            Ok(reward)
229        } else {
230            warn!(%epoch,
231                "Block reward is None. attempting to fetch it from L1",
232            );
233            let block_reward =
234                self.fetcher
235                    .fetch_fixed_block_reward()
236                    .await
237                    .inspect_err(|err| {
238                        error!(?epoch, ?err, "failed to fetch block_reward");
239                    })?;
240            self.inner.write().fixed_block_reward = Some(block_reward);
241            Ok(block_reward)
242        }
243    }
244
245    /// Calculates the dynamic block reward for a given block header within an epoch.
246    ///
247    /// The reward is based on a dynamic inflation rate computed from the current stake ratio (p),
248    /// where `p = total_stake / total_supply`. The inflation function R(p) is defined piecewise:
249    /// - If `p <= 0.01`: R(p) = 0.03 / sqrt(2 * 0.01)
250    /// - Else: R(p) = 0.03 / sqrt(2 * p)
251    async fn calculate_dynamic_block_reward(
252        &self,
253        epoch: EpochNumber,
254        header: &Header,
255        validators: &AuthenticatedValidatorMap,
256    ) -> anyhow::Result<Option<RewardAmount>> {
257        let epoch_height = self.epoch_height;
258        let current_epoch = epoch_from_block_number(header.height(), *epoch_height);
259        let previous_epoch = current_epoch
260            .checked_sub(1)
261            .context("underflow: cannot get previous epoch when current_epoch is 0")?;
262        debug!(?epoch, "previous_epoch={previous_epoch:?}");
263
264        let first_epoch = *self.first_epoch().context("first epoch is None")?;
265
266        // Return early if previous epoch is not the first two epochs
267        // and we don't have the stake table.
268        if previous_epoch > first_epoch + 1 && self.snapshot(previous_epoch.into()).is_none() {
269            warn!(?previous_epoch, "missing stake table for previous epoch");
270            return Ok(None);
271        }
272
273        let previous_reward_distributed = header
274            .total_reward_distributed()
275            .context("Invalid block header: missing total_reward_distributed field")?;
276
277        // Calculate total stake across all active validators
278        let total_stake: U256 = validators.values().map(|v| v.stake).sum();
279        let initial_supply = *self.fetcher.initial_supply.read().await;
280        let initial_supply = match initial_supply {
281            Some(supply) => supply,
282            None => self.fetcher.fetch_and_update_initial_supply().await?,
283        };
284        let total_supply = initial_supply
285            .checked_add(previous_reward_distributed.0)
286            .context("initial_supply + previous_reward_distributed overflow")?;
287
288        // Calculate average block time over the last epoch
289        let curr_ts = header.timestamp_millis_internal();
290        debug!(?epoch, "curr_ts={curr_ts:?}");
291
292        // If the node starts from epoch version V4, there is no previous epoch root available.
293        // In this case, we assume a fixed average block time of 2000 milli seconds (2s)
294        // for the first epoch in which reward id distributed
295        let average_block_time_ms = if previous_epoch <= first_epoch + 1 {
296            ASSUMED_BLOCK_TIME_SECONDS as u64 * 1000 // 2 seconds in milliseconds
297        } else {
298            // We are calculating rewards for epoch `epoch`, so the current epoch should be `epoch - 2`.
299            // We need to calculate the average block time for the current epoch, so we need to know
300            // the previous epoch root which is stored with epoch `epoch - 1`, i.e. the next epoch.
301            let next_epoch = epoch
302                .checked_sub(1)
303                .context("underflow: cannot get next epoch when epoch is 0")?;
304            let prev_ts = match self.map_header(next_epoch, |h| h.timestamp_millis_internal()) {
305                Some(ts) => ts,
306                None => {
307                    info!(
308                        "Calculating rewards for epoch {}, we have no root leaf header for epoch \
309                         - 1. Fetching from peers",
310                        epoch
311                    );
312
313                    let root_height = header.height().checked_sub(*epoch_height).context(
314                        "Epoch height is greater than block height. cannot compute previous epoch \
315                         root height",
316                    )?;
317
318                    let prev_snapshot = self
319                        .snapshot(EpochNumber::new(previous_epoch))
320                        .context("Stake table not found")?;
321                    let prev_stake_table =
322                        HSStakeTable(prev_snapshot.stake_table().cloned().collect());
323                    let success_threshold = prev_snapshot.success_threshold();
324
325                    self.fetcher
326                        .peers
327                        .fetch_leaf(root_height, prev_stake_table, success_threshold)
328                        .await
329                        .context("Epoch root leaf not found")?
330                        .block_header()
331                        .timestamp_millis_internal()
332                },
333            };
334
335            let time_diff = curr_ts.checked_sub(prev_ts).context(
336                "Current timestamp is earlier than previous. underflow in block time calculation",
337            )?;
338
339            time_diff
340                .checked_div(*epoch_height)
341                .context("Epoch height is zero. cannot compute average block time")?
342        };
343        info!(?epoch, %total_supply, %total_stake, %average_block_time_ms,
344                       "dynamic block reward parameters");
345
346        let block_reward =
347            compute_block_reward(epoch, total_supply, total_stake, average_block_time_ms)?;
348
349        Ok(Some(block_reward))
350    }
351
352    /// This function just returns the stored block reward in epoch committee
353    pub fn epoch_block_reward(&self, epoch: EpochNumber) -> Option<RewardAmount> {
354        self.inner
355            .read()
356            .epoch_committee(epoch)
357            .and_then(|committee| committee.block_reward)
358    }
359
360    /// Get the index of a validator's BLS key in the epoch's stake table.
361    /// Returns None if the validator is not in the stake table for this epoch.
362    ///
363    /// The index corresponds to the position in the `leader_counts` array in V6 headers.
364    pub fn get_validator_index(&self, epoch: EpochNumber, bls_key: &PubKey) -> Option<usize> {
365        self.inner
366            .read()
367            .epoch_committee(epoch)
368            .and_then(|committee| committee.stake_table.get_index_of(bls_key))
369    }
370
371    pub fn active_validators(&self, e: EpochNumber) -> anyhow::Result<AuthenticatedValidatorMap> {
372        self.inner.read().active_validators(e)
373    }
374
375    pub fn address(&self, e: EpochNumber, key: &BLSPubKey) -> anyhow::Result<Address> {
376        self.inner.read().address(e, key)
377    }
378
379    pub fn get_validator_config(
380        &self,
381        epoch: EpochNumber,
382        key: &BLSPubKey,
383    ) -> anyhow::Result<AuthenticatedValidator<BLSPubKey>> {
384        let inner = self.inner.read();
385        let address = inner.address(epoch, key)?;
386        let validators = inner.active_validators(epoch)?;
387        validators
388            .get(&address)
389            .context("validator not found")
390            .cloned()
391    }
392
393    // We need a constructor to match our concrete type.
394    pub fn new_stake<B: Into<BlockNumber>>(
395        // TODO remove `new` from trait and rename this to `new`.
396        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
397        committee_members: Vec<PeerConfig<SeqTypes>>,
398        da_members: Vec<PeerConfig<SeqTypes>>,
399        fixed_block_reward: Option<RewardAmount>,
400        fetcher: Fetcher,
401        epoch_height: B,
402    ) -> Self {
403        // For each member, get the stake table entry
404        let stake_table: Vec<_> = committee_members
405            .iter()
406            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
407            .cloned()
408            .collect();
409
410        let eligible_leaders = stake_table.clone();
411        // For each member, get the stake table entry
412        let da_members: Vec<_> = da_members
413            .iter()
414            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
415            .cloned()
416            .collect();
417
418        // Index the stake table by public key
419        let indexed_stake_table: HashMap<PubKey, _> = stake_table
420            .iter()
421            .map(|peer_config| {
422                (
423                    PubKey::public_key(&peer_config.stake_table_entry),
424                    peer_config.clone(),
425                )
426            })
427            .collect();
428
429        // Index the stake table by public key
430        let indexed_da_members: HashMap<PubKey, _> = da_members
431            .iter()
432            .map(|peer_config| {
433                (
434                    PubKey::public_key(&peer_config.stake_table_entry),
435                    peer_config.clone(),
436                )
437            })
438            .collect();
439
440        let da_committee = Arc::new(DaCommittee {
441            committee: da_members,
442            indexed_committee: indexed_da_members,
443        });
444
445        let members = Arc::new(NonEpochCommittee {
446            eligible_leaders,
447            stake_table,
448            indexed_stake_table,
449        });
450
451        let non_epoch_snapshot = NonEpochSnapshot::new(members.clone(), da_committee.clone());
452
453        let epoch_committee = Arc::new(EpochCommittee {
454            eligible_leaders: members.eligible_leaders.clone(),
455            stake_table: members
456                .stake_table
457                .iter()
458                .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
459                .collect(),
460            validators: Default::default(),
461            address_mapping: HashMap::new(),
462            block_reward: Default::default(),
463            stake_table_hash: None,
464            header: None,
465        });
466
467        let mut snapshots = BTreeMap::new();
468        snapshots.insert(
469            EpochNumber::genesis(),
470            EpochSnapshot::new(
471                EpochNumber::genesis(),
472                None,
473                epoch_committee.clone(),
474                None,
475                da_committee.clone(),
476            ),
477        );
478        // TODO: remove this, workaround for hotshot asking for stake tables from epoch 1
479        snapshots.insert(
480            EpochNumber::genesis() + 1u64,
481            EpochSnapshot::new(
482                EpochNumber::genesis() + 1u64,
483                None,
484                epoch_committee,
485                None,
486                da_committee,
487            ),
488        );
489
490        Self {
491            inner: Arc::new(RwLock::new(Inner {
492                non_epoch_snapshot,
493                da_committees: BTreeMap::new(),
494                snapshots,
495                all_validators: BTreeMap::new(),
496                first_epoch: None,
497                fixed_block_reward,
498            })),
499            fetcher: Arc::new(fetcher),
500            update_fixed_block_reward_lock: Arc::new(AsyncMutex::new(())),
501            epoch_height: epoch_height.into(),
502        }
503    }
504
505    pub async fn reload_stake(&mut self, limit: u64) {
506        match self.fetcher.fetch_fixed_block_reward().await {
507            Ok(block_reward) => {
508                info!("Fetched block reward: {block_reward}");
509                self.inner.write().fixed_block_reward = Some(block_reward);
510            },
511            Err(err) => {
512                warn!("Failed to fetch the block reward when reloading the stake tables: {err}");
513            },
514        }
515
516        // Load the 50 latest stored stake tables
517        let loaded_stake = match self
518            .fetcher
519            .persistence
520            .lock()
521            .await
522            .load_latest_stake(limit)
523            .await
524        {
525            Ok(Some(loaded)) => loaded,
526            Ok(None) => {
527                warn!("No stake table history found in persistence!");
528                return;
529            },
530            Err(e) => {
531                error!("Failed to load stake table history from persistence: {e}");
532                return;
533            },
534        };
535
536        for (epoch, (validators, block_reward), stake_table_hash) in loaded_stake {
537            let committee = EpochCommittee::new(validators, block_reward, stake_table_hash, None);
538            self.inner
539                .write()
540                .put_epoch_committee(epoch, Arc::new(committee));
541        }
542    }
543
544    /// Get root leaf header for a given epoch
545    fn map_header<E, F, R>(&self, epoch: E, f: F) -> Option<R>
546    where
547        E: Into<EpochNumber>,
548        F: FnMut(&Header) -> R,
549    {
550        self.inner
551            .read()
552            .epoch_committee(epoch.into())
553            .and_then(|committee| committee.header.as_ref().map(f))
554    }
555
556    fn randomized_committee(
557        &self,
558        epoch: EpochNumber,
559        drb: DrbResult,
560    ) -> Option<RandomizedCommittee<StakeTableEntry<PubKey>>> {
561        let inner = self.inner.read();
562        let Some(raw_stake_table) = inner.epoch_committee(epoch) else {
563            error!(
564                "randomized_committee({epoch}, {drb:?}) was called, but we do not yet have the \
565                 stake table for epoch {epoch}"
566            );
567            return None;
568        };
569
570        let leaders = raw_stake_table
571            .eligible_leaders
572            .clone()
573            .into_iter()
574            .map(|peer_config| peer_config.stake_table_entry)
575            .collect::<Vec<_>>();
576
577        Some(generate_stake_cdf(leaders, drb))
578    }
579}
580
581/// returns the block reward for the given epoch.
582///
583/// Reward depends on the epoch root header version:
584/// V3: Returns the fixed block reward as V3 only supports fixed reward
585/// >= V4 : Returns the dynamic block reward
586///
587/// It also attempts catchup for the root header if not present in the committee,
588/// and also for the stake table of the previous epoch
589/// before computing the dynamic block reward
590pub async fn fetch_and_calculate_block_reward(
591    coordinator: EpochMembershipCoordinator<SeqTypes>,
592    current_epoch: EpochNumber,
593) -> anyhow::Result<RewardAmount> {
594    let committee;
595    let first_epoch;
596    let fixed_block_reward;
597    {
598        let membership = coordinator.membership().inner.read();
599        fixed_block_reward = membership.fixed_block_reward;
600
601        committee = membership
602            .epoch_committee(current_epoch)
603            .context(format!("committee not found for epoch={current_epoch:?}"))?
604            .clone();
605
606        // Return early if committee has a reward already
607        if let Some(reward) = committee.block_reward {
608            return Ok(reward);
609        }
610
611        first_epoch = membership.first_epoch.context(format!(
612            "First epoch not initialized (current_epoch={current_epoch})"
613        ))?;
614    }
615
616    if *current_epoch <= *first_epoch + 1 {
617        bail!(
618            "epoch is in first two epochs: current_epoch={current_epoch}, \
619             first_epoch={first_epoch}"
620        );
621    }
622
623    let header = match committee.header.clone() {
624        Some(header) => header,
625        None => {
626            let root_epoch = current_epoch.checked_sub(2).context(format!(
627                "Epoch calculation underflow (current_epoch={current_epoch})"
628            ))?;
629
630            info!(?root_epoch, "catchup epoch root header");
631
632            let leaf = coordinator
633                .membership()
634                .get_epoch_root(EpochNumber::new(root_epoch))
635                .await
636                .with_context(|| format!("Failed to get epoch root for root_epoch={root_epoch}"))?;
637            leaf.block_header().clone()
638        },
639    };
640
641    if header.version() <= EPOCH_VERSION {
642        return fixed_block_reward.context(format!(
643            "Fixed block reward not found for current_epoch={current_epoch}"
644        ));
645    }
646
647    let prev_epoch_u64 = current_epoch.checked_sub(1).context(format!(
648        "Underflow: cannot compute previous epoch when current_epoch={current_epoch}"
649    ))?;
650
651    let prev_epoch = EpochNumber::new(prev_epoch_u64);
652
653    // If the previous epoch is not in the first two epochs,
654    // there should be a stake table for it
655    if *prev_epoch > *first_epoch + 1
656        && let Err(err) = coordinator.stake_table_for_epoch(Some(prev_epoch))
657    {
658        info!("failed to get membership for epoch={prev_epoch:?}: {err:#}");
659
660        coordinator
661            .wait_for_catchup(prev_epoch)
662            .await
663            .context(format!("failed to catch up for epoch={prev_epoch}"))?;
664    }
665
666    coordinator
667        .membership()
668        .calculate_dynamic_block_reward(current_epoch, &header, &committee.validators)
669        .await
670        .with_context(|| {
671            format!("dynamic block reward calculation failed for epoch={current_epoch}")
672        })?
673        .with_context(|| format!("dynamic block reward returned None. epoch={current_epoch}"))
674}
675
676impl Membership<SeqTypes> for EpochCommittees {
677    type Error = EpochCommitteesError;
678    type Snapshot = EpochSnapshot;
679    type NonEpochSnapshot = NonEpochSnapshot;
680
681    fn snapshot(&self, epoch: EpochNumber) -> Option<Self::Snapshot> {
682        self.inner.read().snapshots.get(&epoch).cloned()
683    }
684
685    fn non_epoch_snapshot(&self) -> Self::NonEpochSnapshot {
686        self.inner.read().non_epoch_snapshot.clone()
687    }
688
689    /// Adds the epoch committee and block reward for a given epoch,
690    /// either by fetching from L1 or using local state if available.
691    /// It also calculates and stores the block reward based on header version.
692    async fn add_epoch_root(&self, block_header: Header) -> Result<(), Self::Error> {
693        let block_number = block_header.block_number();
694
695        let epoch_height = *self.epoch_height;
696
697        let epoch = EpochNumber::new(epoch_from_block_number(block_number, epoch_height) + 2);
698
699        info!(?epoch, "adding epoch root. height={:?}", block_number);
700
701        if !is_epoch_root(block_number, epoch_height) {
702            error!(
703                "`add_epoch_root` was called with a block header that was not the root block for \
704                 an epoch. This should never happen. Header:\n\n{block_header:?}"
705            );
706            return Err(Self::Error::NoRootBlock(block_number.into()));
707        }
708
709        let version = block_header.version();
710        // Update the chain config if the block header contains a newer one.
711        self.fetcher
712            .update_chain_config(&block_header)
713            .await
714            .map_err(Self::Error::Fetcher)?;
715
716        let mut block_reward = None;
717        // Even if the current header is the root of the epoch which falls in the post upgrade
718        // we use the fixed block reward
719        if version == EPOCH_VERSION {
720            let reward = self
721                .fetch_and_update_fixed_block_reward(epoch)
722                .await
723                .map_err(Self::Error::Fetcher)?;
724            block_reward = Some(reward);
725        }
726
727        let epoch_committee = self.inner.read().epoch_committee(epoch).cloned();
728
729        // TODO: If the stake table is missing should it be fetched by an unbounded
730        // number of tasks?
731
732        // If the epoch committee:
733        // - exists and has a header stake table hash and block reward, return early.
734        // - exists without a reward, reuse validators and update reward.
735        // and fetch from L1 if the stake table hash is missing.
736        // - doesn't exist, fetch it from L1.
737        let (active_validators, all_validators, stake_table_hash) = match epoch_committee {
738            Some(committee)
739                if committee.block_reward.is_some()
740                    && committee.header.is_some()
741                    && committee.stake_table_hash.is_some() =>
742            {
743                info!(
744                    ?epoch,
745                    "committee already has block reward, header, and stake table hash; skipping \
746                     add_epoch_root"
747                );
748                return Ok(());
749            },
750
751            Some(committee) => {
752                if let Some(reward) = committee.block_reward {
753                    block_reward = Some(reward);
754                }
755
756                if let Some(hash) = committee.stake_table_hash {
757                    (committee.validators.clone(), Default::default(), Some(hash))
758                } else {
759                    // if stake table hash is missing then recalculate from events
760                    info!(
761                        "Stake table hash missing for epoch {epoch}. recalculating by fetching \
762                         from l1."
763                    );
764                    let set = self
765                        .fetcher
766                        .fetch(epoch, &block_header)
767                        .await
768                        .map_err(Self::Error::Fetcher)?;
769                    (
770                        set.active_validators,
771                        set.all_validators,
772                        set.stake_table_hash,
773                    )
774                }
775            },
776
777            None => {
778                info!("Stake table missing for epoch {epoch}. Fetching from L1.");
779                let set = self
780                    .fetcher
781                    .fetch(epoch, &block_header)
782                    .await
783                    .map_err(Self::Error::Fetcher)?;
784                (
785                    set.active_validators,
786                    set.all_validators,
787                    set.stake_table_hash,
788                )
789            },
790        };
791
792        // If we are past the DRB+Header upgrade point,
793        // and don't have block reward
794        // calculate the dynamic block reward based on validator info and block header.
795        if block_reward.is_none() && version >= DRB_AND_HEADER_UPGRADE_VERSION {
796            info!(?epoch, "calculating dynamic block reward");
797            let reward = self
798                .calculate_dynamic_block_reward(epoch, &block_header, &active_validators)
799                .await
800                .map_err(Self::Error::Reward)?;
801
802            info!(?epoch, "calculated dynamic block reward = {reward:?}");
803            block_reward = reward;
804        }
805
806        let committee = EpochCommittee::new(
807            active_validators.clone(),
808            block_reward,
809            stake_table_hash,
810            Some(block_header.clone()),
811        );
812
813        let previous_epoch;
814        let previous_committee;
815        let previous_validators;
816        {
817            let mut inner = self.inner.write();
818            inner.put_epoch_committee(epoch, Arc::new(committee));
819            // previous_epoch is the epoch prior to `epoch`,
820            // or the epoch immediately succeeding the block header
821            previous_epoch = EpochNumber::new(epoch.saturating_sub(1));
822            previous_committee = inner.epoch_committee(previous_epoch).cloned();
823            // garbage collect the validator set
824            inner.all_validators = inner.all_validators.split_off(&previous_epoch);
825            // extract `all_validators` for the previous epoch
826            previous_validators = inner.all_validators.remove(&previous_epoch);
827            inner.all_validators.insert(epoch, all_validators.clone());
828        }
829
830        let persistence_lock = self.fetcher.persistence.lock().await;
831
832        let decided_hash = block_header.next_stake_table_hash();
833
834        // we store the information from the previous epoch's in-memory committeee
835        // if the decided stake_table_hash is consistent with what we get
836        //
837        // in principle this is unnecessary and we could've stored these right away,
838        // without offsetting the epoch. but the intention is to catch L1 provider issues
839        // if there is a mismatch
840        if let Some(previous_committee) = previous_committee {
841            if decided_hash.is_none() || decided_hash == previous_committee.stake_table_hash {
842                if let Err(e) = persistence_lock
843                    .store_stake(
844                        previous_epoch,
845                        previous_committee.validators.clone(),
846                        previous_committee.block_reward,
847                        previous_committee.stake_table_hash,
848                    )
849                    .await
850                {
851                    error!(
852                        ?e,
853                        ?previous_epoch,
854                        "`add_epoch_root`, error storing stake table"
855                    );
856                }
857
858                if let Some(previous_validators) = previous_validators
859                    && let Err(e) = persistence_lock
860                        .store_all_validators(previous_epoch, previous_validators)
861                        .await
862                {
863                    error!(?e, ?epoch, "`add_epoch_root`, error storing all validators");
864                }
865            } else {
866                panic!(
867                    "The decided block header's `next_stake_table_hash` does not match the hash \
868                     of the stake table we have. This is an unrecoverable error likely due to \
869                     issues with your L1 RPC provider. Decided:\n\n{:?}Actual:\n\n{:?}",
870                    decided_hash, previous_committee.stake_table_hash
871                );
872            }
873        }
874
875        Ok(())
876    }
877
878    async fn get_epoch_root(&self, epoch: EpochNumber) -> Result<Leaf2, Self::Error> {
879        let block_height = root_block_in_epoch(*epoch, *self.epoch_height());
880        let peers = self.fetcher.peers.clone();
881        let snapshot = self
882            .snapshot(epoch)
883            .ok_or_else(|| Self::Error::Message(format!("no committee for epoch={epoch}")))?;
884        let stake_table = HSStakeTable(snapshot.stake_table().cloned().collect());
885        let success_threshold = snapshot.success_threshold();
886        let leaf: Leaf2 = peers
887            .fetch_leaf(block_height, stake_table, success_threshold)
888            .await
889            .map_err(Self::Error::Catchup)?;
890        Ok(leaf)
891    }
892
893    async fn get_epoch_drb(&self, epoch: EpochNumber) -> Result<DrbResult, Self::Error> {
894        let peers = self.fetcher.peers.clone();
895
896        // Try to retrieve the DRB result from an existing snapshot's randomized committee.
897        if let Some(snap) = self.snapshot(epoch)
898            && let Some(rand) = &snap.inner.randomized
899        {
900            return Ok(rand.drb_result());
901        }
902
903        // Otherwise, we try to fetch the epoch root leaf
904        let previous_epoch = match epoch.checked_sub(1) {
905            Some(epoch) => EpochNumber::new(epoch),
906            None => {
907                return self
908                    .snapshot(epoch)
909                    .and_then(|s| s.inner.randomized.as_ref().map(|r| r.drb_result()))
910                    .ok_or_else(|| {
911                        Self::Error::Message(format!(
912                            "Missing randomized committee for epoch {epoch}"
913                        ))
914                    });
915            },
916        };
917
918        let prev_snapshot = self.snapshot(previous_epoch).ok_or_else(|| {
919            Self::Error::Message(format!("no committee for previous_epoch={previous_epoch}"))
920        })?;
921        let stake_table = HSStakeTable(prev_snapshot.stake_table().cloned().collect());
922        let success_threshold = prev_snapshot.success_threshold();
923
924        let block_height = transition_block_for_epoch(*previous_epoch, *self.epoch_height());
925
926        debug!(
927            "Getting DRB for epoch {}, block height {}",
928            epoch, block_height
929        );
930        let drb_leaf = peers
931            .try_fetch_leaf(1, block_height, stake_table, success_threshold)
932            .await
933            .map_err(Self::Error::Catchup)?;
934
935        let Some(drb) = drb_leaf.next_drb_result else {
936            error!(
937                "We received a leaf that should contain a DRB result, but the DRB result is \
938                 missing: {:?}",
939                drb_leaf
940            );
941
942            return Err(Self::Error::Message(
943                "DRB leaf is missing the DRB result.".to_string(),
944            ));
945        };
946
947        Ok(drb)
948    }
949
950    fn add_drb_result(&self, epoch: EpochNumber, drb: DrbResult) {
951        info!("Adding DRB result {drb:?} to epoch {epoch}");
952        if let Some(committee) = self.randomized_committee(epoch, drb) {
953            self.inner
954                .write()
955                .put_randomized_committee(epoch, Arc::new(committee));
956        }
957    }
958
959    fn set_first_epoch(&self, epoch: EpochNumber, initial_drb_result: DrbResult) {
960        let rand_comm = Arc::new(
961            self.randomized_committee(EpochNumber::genesis(), initial_drb_result)
962                .expect("committee exist at genesis"),
963        );
964
965        let mut inner = self.inner.write();
966        inner.first_epoch = Some(epoch);
967
968        let epoch_committee = inner
969            .epoch_committee(EpochNumber::genesis())
970            .expect("committee exists at genesis")
971            .clone();
972
973        // Build snapshots for `epoch` and `epoch + 1` with the genesis
974        // stake table and the initial DRB result.
975        inner.put_epoch_committee(epoch, epoch_committee.clone());
976        inner.put_randomized_committee(epoch, rand_comm.clone());
977        inner.put_epoch_committee(epoch + 1, epoch_committee);
978        inner.put_randomized_committee(epoch + 1, rand_comm);
979    }
980
981    fn first_epoch(&self) -> Option<EpochNumber> {
982        self.inner.read().first_epoch
983    }
984
985    fn highest_known_epoch(&self) -> Option<EpochNumber> {
986        self.inner.read().snapshots.keys().max().copied()
987    }
988
989    fn add_da_committee(&self, first_epoch: EpochNumber, committee: Vec<PeerConfig<SeqTypes>>) {
990        let indexed_committee: HashMap<PubKey, _> = committee
991            .iter()
992            .map(|peer_config| {
993                (
994                    PubKey::public_key(&peer_config.stake_table_entry),
995                    peer_config.clone(),
996                )
997            })
998            .collect();
999
1000        let da_committee = Arc::new(DaCommittee {
1001            committee,
1002            indexed_committee,
1003        });
1004
1005        let mut inner = self.inner.write();
1006        inner
1007            .da_committees
1008            .insert(first_epoch, da_committee.clone());
1009
1010        // The DA committee inserted at `first_epoch` applies to every epoch
1011        // up to (but not including) the next `da_committees` key. Snapshots
1012        // for those epochs were built with whatever DA was current at the
1013        // time and must be rebuilt so reads of `da_stake_table()` etc.
1014        // reflect the new committee.
1015        let upper = inner
1016            .da_committees
1017            .range((Bound::Excluded(first_epoch), Bound::Unbounded))
1018            .next()
1019            .map(|(k, _)| *k);
1020
1021        let range = if let Some(u) = upper {
1022            (Bound::Included(first_epoch), Bound::Excluded(u))
1023        } else {
1024            (Bound::Included(first_epoch), Bound::Unbounded)
1025        };
1026
1027        let affected: Vec<EpochNumber> = inner.snapshots.range(range).map(|(k, _)| *k).collect();
1028        let first_epoch_field = inner.first_epoch;
1029
1030        for epoch in affected {
1031            let Some(existing) = inner.snapshots.get(&epoch) else {
1032                continue;
1033            };
1034            let new_snapshot = EpochSnapshot::new(
1035                epoch,
1036                first_epoch_field,
1037                existing.inner.committee.clone(),
1038                existing.inner.randomized.clone(),
1039                da_committee.clone(),
1040            );
1041            inner.snapshots.insert(epoch, new_snapshot);
1042        }
1043    }
1044}
1045
1046#[derive(Error, Debug)]
1047pub enum EpochCommitteesError {
1048    #[error("could not lookup leader")]
1049    LeaderLookupError,
1050
1051    #[error("block {0} is not the root block for an epoch")]
1052    NoRootBlock(BlockNumber),
1053
1054    #[error("fetcher error: {0}")]
1055    Fetcher(#[source] anyhow::Error),
1056
1057    #[error("{0}")]
1058    Message(String),
1059
1060    #[error("state catchup error: {0}")]
1061    Catchup(#[source] anyhow::Error),
1062
1063    #[error("reward calculation error: {0}")]
1064    Reward(#[source] anyhow::Error),
1065}
1066
1067impl Inner {
1068    /// The DA committee that applies to `epoch`, or the non-epoch fallback
1069    /// when `epoch` is `None` or no explicit DA committee covers it.
1070    fn resolve_da_committee(&self, epoch: Option<EpochNumber>) -> Arc<DaCommittee> {
1071        if let Some(e) = epoch {
1072            // The greatest key ≤ `e` is the DA committee that applies.
1073            self.da_committees
1074                .range((Bound::Included(0.into()), Bound::Included(e)))
1075                .last()
1076                .map(|(_, committee)| committee.clone())
1077                .unwrap_or_else(|| self.non_epoch_snapshot.inner.da_committee.clone())
1078        } else {
1079            self.non_epoch_snapshot.inner.da_committee.clone()
1080        }
1081    }
1082
1083    /// Borrow the per-epoch `EpochCommittee` if loaded.
1084    fn epoch_committee(&self, e: EpochNumber) -> Option<&Arc<EpochCommittee>> {
1085        self.snapshots.get(&e).map(|s| &s.inner.committee)
1086    }
1087
1088    fn address(&self, e: EpochNumber, key: &BLSPubKey) -> anyhow::Result<Address> {
1089        self.epoch_committee(e)
1090            .context("state for found")?
1091            .address_mapping
1092            .get(key)
1093            .copied()
1094            .context(format!(
1095                "failed to get ethereum address for bls key {key}. epoch={e}"
1096            ))
1097    }
1098
1099    fn active_validators(&self, e: EpochNumber) -> anyhow::Result<AuthenticatedValidatorMap> {
1100        Ok(self
1101            .epoch_committee(e)
1102            .context("state not found")?
1103            .validators
1104            .clone())
1105    }
1106
1107    /// Rebuild (or insert) the snapshot for `epoch` carrying forward
1108    /// `randomized` from any existing snapshot for that epoch.
1109    fn put_epoch_committee(&mut self, epoch: EpochNumber, committee: Arc<EpochCommittee>) {
1110        let randomized = self
1111            .snapshots
1112            .get(&epoch)
1113            .and_then(|s| s.inner.randomized.clone());
1114        let da_committee = self.resolve_da_committee(Some(epoch));
1115        let first_epoch = self.first_epoch;
1116        self.snapshots.insert(
1117            epoch,
1118            EpochSnapshot::new(epoch, first_epoch, committee, randomized, da_committee),
1119        );
1120    }
1121
1122    /// Rebuild the snapshot for `epoch` with a new randomized committee,
1123    /// carrying forward the existing committee/da. No-op if no snapshot
1124    /// for `epoch` exists yet.
1125    fn put_randomized_committee(
1126        &mut self,
1127        epoch: EpochNumber,
1128        randomized: Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>,
1129    ) {
1130        let Some(existing) = self.snapshots.get(&epoch).cloned() else {
1131            return;
1132        };
1133        let committee = existing.inner.committee.clone();
1134        let da_committee = existing.inner.da_committee.clone();
1135        let first_epoch = self.first_epoch;
1136        self.snapshots.insert(
1137            epoch,
1138            EpochSnapshot::new(
1139                epoch,
1140                first_epoch,
1141                committee,
1142                Some(randomized),
1143                da_committee,
1144            ),
1145        );
1146    }
1147}
1148
1149/// A consistent per-epoch view of `EpochCommittees`.
1150///
1151/// Returned by [`Membership::snapshot`].
1152#[derive(Clone, Debug)]
1153pub struct EpochSnapshot {
1154    inner: Arc<EpochSnapshotInner>,
1155}
1156
1157#[derive(Debug)]
1158struct EpochSnapshotInner {
1159    epoch: EpochNumber,
1160    first_epoch: Option<EpochNumber>,
1161    committee: Arc<EpochCommittee>,
1162    randomized: Option<Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>>,
1163    da_committee: Arc<DaCommittee>,
1164}
1165
1166impl EpochSnapshot {
1167    fn new(
1168        epoch: EpochNumber,
1169        first_epoch: Option<EpochNumber>,
1170        committee: Arc<EpochCommittee>,
1171        randomized: Option<Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>>,
1172        da_committee: Arc<DaCommittee>,
1173    ) -> Self {
1174        Self {
1175            inner: Arc::new(EpochSnapshotInner {
1176                epoch,
1177                first_epoch,
1178                committee,
1179                randomized,
1180                da_committee,
1181            }),
1182        }
1183    }
1184}
1185
1186impl EpochSnapshot {
1187    /// Index of `key` in this epoch's stake table, if present.
1188    pub fn validator_index(&self, key: &PubKey) -> Option<usize> {
1189        self.inner.committee.stake_table.get_index_of(key)
1190    }
1191
1192    /// The full validator record (account, stake, delegators, etc.) for `key`.
1193    pub fn validator_config(
1194        &self,
1195        key: &BLSPubKey,
1196    ) -> anyhow::Result<&AuthenticatedValidator<BLSPubKey>> {
1197        let address = self
1198            .inner
1199            .committee
1200            .address_mapping
1201            .get(key)
1202            .context(format!(
1203                "failed to get ethereum address for bls key {key}. epoch={}",
1204                self.inner.epoch
1205            ))?;
1206        self.inner
1207            .committee
1208            .validators
1209            .get(address)
1210            .context("validator not found")
1211    }
1212
1213    pub fn epoch_block_reward(&self) -> Option<RewardAmount> {
1214        self.inner.committee.block_reward
1215    }
1216
1217    pub fn validators(&self) -> &AuthenticatedValidatorMap {
1218        &self.inner.committee.validators
1219    }
1220}
1221
1222impl MembershipSnapshot<SeqTypes> for EpochSnapshot {
1223    type Error = EpochCommitteesError;
1224    type StakeTableHash = StakeTableState;
1225
1226    fn epoch(&self) -> EpochNumber {
1227        self.inner.epoch
1228    }
1229
1230    fn first_epoch(&self) -> Option<EpochNumber> {
1231        self.inner.first_epoch
1232    }
1233
1234    fn has_drb(&self) -> bool {
1235        self.inner.randomized.is_some()
1236    }
1237
1238    fn stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send {
1239        self.inner.committee.stake_table.values()
1240    }
1241
1242    fn da_stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send {
1243        self.inner.da_committee.committee.iter()
1244    }
1245
1246    fn committee_members(&self, _: ViewNumber) -> impl ExactSizeIterator<Item = &PubKey> + Send {
1247        self.inner.committee.stake_table.keys()
1248    }
1249
1250    fn da_committee_members(&self, _: ViewNumber) -> impl ExactSizeIterator<Item = &PubKey> + Send {
1251        self.inner.da_committee.indexed_committee.keys()
1252    }
1253
1254    fn stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1255        self.inner.committee.stake_table.get(key).cloned()
1256    }
1257
1258    fn da_stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1259        self.inner.da_committee.indexed_committee.get(key).cloned()
1260    }
1261
1262    fn has_stake(&self, key: &PubKey) -> bool {
1263        self.stake(key)
1264            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1265            .unwrap_or_default()
1266    }
1267
1268    fn has_da_stake(&self, key: &PubKey) -> bool {
1269        self.da_stake(key)
1270            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1271            .unwrap_or_default()
1272    }
1273
1274    /// Returns the leader's public key for a given view number in this epoch.
1275    ///
1276    /// # Errors
1277    ///
1278    /// Returns `LeaderLookupError` if `first_epoch` is unset, the snapshot's
1279    /// epoch is before `first_epoch`, or the randomized committee is missing.
1280    fn lookup_leader(&self, view: ViewNumber) -> Result<PubKey, Self::Error> {
1281        let inner = &self.inner;
1282        let Some(first_epoch) = inner.first_epoch else {
1283            error!(
1284                "leader requested for epoch {} but first_epoch is unset",
1285                inner.epoch,
1286            );
1287            return Err(EpochCommitteesError::LeaderLookupError);
1288        };
1289        if inner.epoch < first_epoch {
1290            error!(
1291                "leader requested for epoch {} before first epoch {first_epoch}",
1292                inner.epoch,
1293            );
1294            return Err(EpochCommitteesError::LeaderLookupError);
1295        }
1296        let Some(rand) = inner.randomized.as_deref() else {
1297            error!(
1298                "missing randomized committee for epoch {} in snapshot",
1299                inner.epoch,
1300            );
1301            return Err(EpochCommitteesError::LeaderLookupError);
1302        };
1303        Ok(PubKey::public_key(&select_randomized_leader(rand, *view)))
1304    }
1305
1306    fn stake_table_hash(&self) -> Option<Commitment<Self::StakeTableHash>> {
1307        self.inner.committee.stake_table_hash
1308    }
1309}
1310
1311/// A consistent pre-epoch view of `EpochCommittees`.
1312///
1313/// Returned by [`Membership::non_epoch_snapshot`].
1314#[derive(Clone, Debug)]
1315pub struct NonEpochSnapshot {
1316    inner: Arc<NonEpochSnapshotInner>,
1317}
1318
1319#[derive(Debug)]
1320struct NonEpochSnapshotInner {
1321    committee: Arc<NonEpochCommittee>,
1322    da_committee: Arc<DaCommittee>,
1323}
1324
1325impl NonEpochSnapshot {
1326    fn new(committee: Arc<NonEpochCommittee>, da_committee: Arc<DaCommittee>) -> Self {
1327        Self {
1328            inner: Arc::new(NonEpochSnapshotInner {
1329                committee,
1330                da_committee,
1331            }),
1332        }
1333    }
1334}
1335
1336impl NonEpochMembershipSnapshot<SeqTypes> for NonEpochSnapshot {
1337    type Error = EpochCommitteesError;
1338
1339    fn stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send + '_ {
1340        self.inner.committee.stake_table.iter()
1341    }
1342
1343    fn da_stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send + '_ {
1344        self.inner.da_committee.committee.iter()
1345    }
1346
1347    fn committee_members(
1348        &self,
1349        _: ViewNumber,
1350    ) -> impl ExactSizeIterator<Item = &PubKey> + Send + '_ {
1351        self.inner.committee.indexed_stake_table.keys()
1352    }
1353
1354    fn da_committee_members(
1355        &self,
1356        _: ViewNumber,
1357    ) -> impl ExactSizeIterator<Item = &PubKey> + Send + '_ {
1358        self.inner.da_committee.indexed_committee.keys()
1359    }
1360
1361    fn stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1362        self.inner.committee.indexed_stake_table.get(key).cloned()
1363    }
1364
1365    fn da_stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1366        self.inner.da_committee.indexed_committee.get(key).cloned()
1367    }
1368
1369    fn has_stake(&self, key: &PubKey) -> bool {
1370        self.stake(key)
1371            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1372            .unwrap_or_default()
1373    }
1374
1375    fn has_da_stake(&self, key: &PubKey) -> bool {
1376        self.da_stake(key)
1377            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1378            .unwrap_or_default()
1379    }
1380
1381    fn lookup_leader(&self, view: ViewNumber) -> Result<PubKey, Self::Error> {
1382        let leaders = &self.inner.committee.eligible_leaders;
1383        if leaders.is_empty() {
1384            return Err(EpochCommitteesError::LeaderLookupError);
1385        }
1386        let index = *view as usize % leaders.len();
1387        Ok(PubKey::public_key(&leaders[index].stake_table_entry))
1388    }
1389}
1390
1391#[cfg(test)]
1392mod tests {
1393    use std::sync::{
1394        Arc,
1395        atomic::{AtomicBool, AtomicUsize, Ordering},
1396    };
1397
1398    use committable::Committable;
1399    use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
1400    use hotshot_types::{
1401        ValidatorConfig,
1402        traits::{BlockPayload, block_contents::BlockHeader},
1403    };
1404    use tokio::{task::JoinSet, time::Duration};
1405
1406    use super::*;
1407    use crate::{NodeState, Payload, Transaction};
1408
1409    /// Wall-clock target each concurrency test runs for. Long enough to
1410    /// catch flaky races that one-shot tests would miss; short enough to
1411    /// be tolerable in CI.
1412    const TEST_DURATION: Duration = Duration::from_secs(5);
1413
1414    fn build_committees(num_peers: u64) -> EpochCommittees {
1415        let peers: Vec<PeerConfig<SeqTypes>> = (0..num_peers)
1416            .map(|i| {
1417                ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1418                    [42u8; 32],
1419                    i,
1420                    U256::from(100),
1421                    true,
1422                )
1423                .public_config()
1424            })
1425            .collect();
1426        EpochCommittees::new_stake(peers.clone(), peers, None, Fetcher::mock(), 100u64)
1427    }
1428
1429    // Concurrent reads must not panic or deadlock while a writer drives
1430    // real mutations on the same `Inner` lock.
1431    //
1432    // Per-call invariants (within a single method invocation) are
1433    // checked. Cross-call invariants are *not*: each public method
1434    // takes its own short-lived lock, so a sequence of two read calls
1435    // observes two snapshots in time and the writer can run between
1436    // them. See the `EpochCommittees` doc-comment for the rationale.
1437    //
1438    // To make the contention real, we pre-populate `inner.state` for
1439    // several extra epochs so the writer's `add_drb_result` calls
1440    // actually take the write lock. Without this they would early-exit
1441    // on the missing-state branch and never contend.
1442    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1443    async fn concurrent_reads_during_mutations() {
1444        let committees = build_committees(8);
1445        committees.set_first_epoch(EpochNumber::new(1), [0u8; 32]);
1446
1447        // Pre-populate snapshots for epochs 2..6 by cloning the genesis
1448        // committee. `add_drb_result(e)` is a no-op when no snapshot for
1449        // `e` exists, so without this the writer never takes the write
1450        // lock for those epochs.
1451        {
1452            let mut inner = committees.inner.write();
1453            let template = inner
1454                .epoch_committee(EpochNumber::genesis())
1455                .expect("genesis committee exists")
1456                .clone();
1457            for e in 2..6 {
1458                inner.put_epoch_committee(EpochNumber::new(e), template.clone());
1459            }
1460        }
1461
1462        let stop = Arc::new(AtomicBool::new(false));
1463        let mut tasks = JoinSet::new();
1464
1465        for _ in 0..8 {
1466            let c = committees.clone();
1467            let stop = Arc::clone(&stop);
1468            tasks.spawn(async move {
1469                let stable = EpochNumber::new(1);
1470                let mutating = EpochNumber::new(3);
1471                let view = ViewNumber::new(0);
1472                while !stop.load(Ordering::Relaxed) {
1473                    // Stable epoch — the writer never touches
1474                    // `inner.state[1]` or `inner.randomized_committees[1]`
1475                    // (both were set by `set_first_epoch` before this
1476                    // loop and stay unchanged thereafter), so the
1477                    // assertions below hold even across separate snapshots.
1478                    let stable_snap = c.snapshot(stable).expect("stable snapshot");
1479                    let len = stable_snap.stake_table().len();
1480                    assert_eq!(len, stable_snap.total_nodes());
1481                    let leader = stable_snap.lookup_leader(view).expect("leader");
1482                    assert!(
1483                        stable_snap.committee_members(view).any(|p| p == &leader),
1484                        "leader {leader:?} not in committee_members for stable epoch",
1485                    );
1486                    assert_eq!(c.first_epoch(), Some(stable));
1487
1488                    // Mutating epoch — the writer churns
1489                    // `randomized_committees[3]`. Just exercise the API
1490                    // path; the value can vary or be transiently absent
1491                    // between calls and that is the documented
1492                    // behaviour, not a bug.
1493                    let _ = c.snapshot(mutating);
1494                    if let Some(s) = c.snapshot(mutating) {
1495                        let _ = s.lookup_leader(view);
1496                    }
1497                    tokio::task::yield_now().await;
1498                }
1499            });
1500        }
1501
1502        // Writer driving real mutations against fields the readers see.
1503        // Loops until the test signals stop, so the contention window
1504        // matches `TEST_DURATION`.
1505        tasks.spawn({
1506            let c = committees.clone();
1507            let stop = Arc::clone(&stop);
1508            async move {
1509                let extra: Vec<PeerConfig<SeqTypes>> = (0..3)
1510                    .map(|i| {
1511                        ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1512                            [99u8; 32],
1513                            i,
1514                            U256::from(50),
1515                            true,
1516                        )
1517                        .public_config()
1518                    })
1519                    .collect();
1520                let mut i: u64 = 0;
1521                while !stop.load(Ordering::Relaxed) {
1522                    // Pre-populated epochs 2..5 — these acquire the
1523                    // write lock and contend with reader read locks.
1524                    c.add_drb_result(EpochNumber::new(2 + (i % 4)), [(i % 256) as u8; 32]);
1525                    // Non-existent epoch — exercises the read-then-no-op
1526                    // branch of `add_drb_result` (read lock only).
1527                    c.add_drb_result(EpochNumber::new(10_000 + i), [0xAB; 32]);
1528                    if i.is_multiple_of(50) {
1529                        c.add_da_committee(i.into(), extra.clone());
1530                    }
1531                    if i.is_multiple_of(16) {
1532                        tokio::task::yield_now().await;
1533                    }
1534                    i += 1;
1535                }
1536            }
1537        });
1538
1539        tokio::time::sleep(TEST_DURATION).await;
1540        stop.store(true, Ordering::Relaxed);
1541        while let Some(res) = tasks.join_next().await {
1542            res.expect("task panicked");
1543        }
1544    }
1545
1546    // A task concurrent with `set_first_epoch` must never see a
1547    // partially-applied state, since all mutations in `set_first_epoch`
1548    // happen under a single write lock.
1549    //
1550    // We can't verify this through the public API because each method
1551    // takes its own lock — between two reader calls the writer can run
1552    // to completion (a real TOCTOU window in the new locking model, not
1553    // a torn read). Instead we take a single-locked snapshot of all
1554    // affected fields directly and assert the snapshot is internally
1555    // consistent: either the pre-state (nothing set) or the post-state
1556    // (everything set together).
1557    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1558    async fn set_first_epoch_is_atomic() {
1559        // Snapshot all fields touched by `set_first_epoch` under a
1560        // single read lock so the reader observes one consistent state.
1561        fn snapshot(c: &EpochCommittees, e: EpochNumber) -> Snapshot {
1562            let inner = c.inner.read();
1563            Snapshot {
1564                first_epoch: inner.first_epoch,
1565                state_e: inner.snapshots.contains_key(&e),
1566                state_e1: inner.snapshots.contains_key(&(e + 1)),
1567                rand_e: inner
1568                    .snapshots
1569                    .get(&e)
1570                    .is_some_and(|s| s.inner.randomized.is_some()),
1571                rand_e1: inner
1572                    .snapshots
1573                    .get(&(e + 1))
1574                    .is_some_and(|s| s.inner.randomized.is_some()),
1575            }
1576        }
1577
1578        #[derive(Debug)]
1579        struct Snapshot {
1580            first_epoch: Option<EpochNumber>,
1581            state_e: bool,
1582            state_e1: bool,
1583            rand_e: bool,
1584            rand_e1: bool,
1585        }
1586
1587        let target = EpochNumber::new(10);
1588
1589        // Concurrency bugs are flaky — loop until we've spent
1590        // `TEST_DURATION` widening the window for catching a torn
1591        // state. Each round is one race attempt against
1592        // `set_first_epoch`.
1593        let test_start = tokio::time::Instant::now();
1594        let mut round: u64 = 0;
1595        while test_start.elapsed() < TEST_DURATION {
1596            let committees = build_committees(4);
1597            let stop = Arc::new(AtomicBool::new(false));
1598            let post_observations = Arc::new(AtomicUsize::new(0));
1599
1600            let reader = {
1601                let c = committees.clone();
1602                let stop = Arc::clone(&stop);
1603                let post = Arc::clone(&post_observations);
1604                tokio::spawn(async move {
1605                    while !stop.load(Ordering::Relaxed) {
1606                        let s = snapshot(&c, target);
1607                        match s.first_epoch {
1608                            None => assert!(
1609                                !s.state_e && !s.state_e1 && !s.rand_e && !s.rand_e1,
1610                                "torn snapshot: first_epoch=None but some target state present: \
1611                                 {s:?}",
1612                            ),
1613                            Some(e) => {
1614                                assert_eq!(e, target, "only target is ever set");
1615                                assert!(
1616                                    s.state_e && s.state_e1 && s.rand_e && s.rand_e1,
1617                                    "torn snapshot: first_epoch=Some but some target state \
1618                                     missing: {s:?}",
1619                                );
1620                                post.fetch_add(1, Ordering::Relaxed);
1621                            },
1622                        }
1623                        tokio::task::yield_now().await;
1624                    }
1625                })
1626            };
1627
1628            // Brief warmup so the reader is in its loop.
1629            tokio::time::sleep(Duration::from_millis(2)).await;
1630            committees.set_first_epoch(target, [(round as u8) ^ 0xA5; 32]);
1631
1632            // Wait until the reader has observed the post-state at least
1633            // once, with a generous timeout.
1634            let deadline = tokio::time::Instant::now() + Duration::from_millis(200);
1635            while tokio::time::Instant::now() < deadline
1636                && post_observations.load(Ordering::Relaxed) == 0
1637            {
1638                tokio::task::yield_now().await;
1639            }
1640
1641            stop.store(true, Ordering::Relaxed);
1642            reader.await.expect("reader panicked");
1643            assert!(
1644                post_observations.load(Ordering::Relaxed) > 0,
1645                "round {round}: reader never observed post-set state",
1646            );
1647            round += 1;
1648        }
1649        assert!(round > 0, "test loop never executed a round");
1650    }
1651
1652    // Many writer tasks hammer `add_drb_result` for the same epoch with
1653    // distinct DRBs. While they do, reader tasks call `lookup_leader`,
1654    // which must always succeed once the randomized committee is set —
1655    // the writer overwrites the entry but never removes it. After the
1656    // writers drain, the entry must still be present and queryable.
1657    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1658    async fn concurrent_add_drb_result_same_epoch() {
1659        let committees = build_committees(4);
1660        let epoch = EpochNumber::new(1);
1661        // `has_randomized_stake_table` and `lookup_leader` both
1662        // require `first_epoch` to be set.
1663        committees.set_first_epoch(epoch, [0u8; 32]);
1664
1665        let stop = Arc::new(AtomicBool::new(false));
1666        let writes = Arc::new(AtomicUsize::new(0));
1667        let lookups = Arc::new(AtomicUsize::new(0));
1668
1669        let mut writers = JoinSet::new();
1670        let mut readers = JoinSet::new();
1671
1672        // Readers: lookup_leader must always succeed for an epoch
1673        // whose randomized committee has been populated, even while it
1674        // is being overwritten.
1675        for _ in 0..4 {
1676            let c = committees.clone();
1677            let stop = Arc::clone(&stop);
1678            let lookups = Arc::clone(&lookups);
1679            readers.spawn(async move {
1680                let view = ViewNumber::new(0);
1681                while !stop.load(Ordering::Relaxed) {
1682                    c.snapshot(epoch)
1683                        .expect("snapshot")
1684                        .lookup_leader(view)
1685                        .expect("randomized committee must remain present once set");
1686                    lookups.fetch_add(1, Ordering::Relaxed);
1687                    tokio::task::yield_now().await;
1688                }
1689            });
1690        }
1691
1692        // Writers: each task overwrites the randomized committee with
1693        // a unique DRB derived from its task id and iteration. Loops
1694        // until stop so the contention window matches `TEST_DURATION`.
1695        for tid in 0..8u8 {
1696            let c = committees.clone();
1697            let stop = Arc::clone(&stop);
1698            let writes = Arc::clone(&writes);
1699            writers.spawn(async move {
1700                let mut i: u64 = 0;
1701                while !stop.load(Ordering::Relaxed) {
1702                    let mut drb = [tid; 32];
1703                    drb[0] = (i & 0xFF) as u8;
1704                    c.add_drb_result(epoch, drb);
1705                    writes.fetch_add(1, Ordering::Relaxed);
1706                    if i.is_multiple_of(16) {
1707                        tokio::task::yield_now().await;
1708                    }
1709                    i += 1;
1710                }
1711            });
1712        }
1713
1714        tokio::time::sleep(TEST_DURATION).await;
1715        stop.store(true, Ordering::Relaxed);
1716        while let Some(res) = writers.join_next().await {
1717            res.expect("writer panicked");
1718        }
1719        while let Some(res) = readers.join_next().await {
1720            res.expect("reader panicked");
1721        }
1722
1723        assert!(writes.load(Ordering::Relaxed) > 0, "writers never advanced",);
1724        assert!(
1725            lookups.load(Ordering::Relaxed) > 0,
1726            "readers never observed the randomized committee",
1727        );
1728        let snap = committees
1729            .snapshot(epoch)
1730            .expect("randomized committee must survive concurrent writes");
1731        assert!(snap.has_drb(), "randomized committee must remain present");
1732        let view = ViewNumber::new(0);
1733        let _leader = snap
1734            .lookup_leader(view)
1735            .expect("lookup_leader succeeds when randomized committee is present");
1736    }
1737
1738    // Build an epoch-root header for `epoch_height = 100`. Block height
1739    // 95 satisfies `is_epoch_root(95, 100)` and produces target epoch 3
1740    // when passed to `add_epoch_root`.
1741    async fn build_epoch_root_header() -> Header {
1742        let instance = NodeState::mock_v2();
1743        let tx = Transaction::of_size(10);
1744        let (payload, _) = Payload::from_transactions([tx], &instance.genesis_state, &instance)
1745            .await
1746            .expect("payload");
1747        let metadata = payload.ns_table().clone();
1748        let header = Header::genesis(&instance, payload, &metadata, MOCK_UPGRADE.base);
1749        match header {
1750            Header::V2(mut h) => {
1751                h.height = 95;
1752                Header::V2(h)
1753            },
1754            other => panic!("expected V2 header from NodeState::mock_v2, got {other:?}"),
1755        }
1756    }
1757
1758    // `add_epoch_root` mutates `state[epoch]` and `all_validators[epoch]`
1759    // inside one `inner.write()` block. A reader observing both fields
1760    // under one read lock must see them flip together: pre-state
1761    // (`state[epoch].header == None`, `all_validators[epoch]` absent)
1762    // or post-state (`header == Some(_)`, `all_validators[epoch]`
1763    // present). A torn snapshot in either direction would indicate the
1764    // mutations leaked outside the single write lock.
1765    //
1766    // We pre-populate `state[epoch]` with `block_reward` and
1767    // `stake_table_hash` set so `add_epoch_root` reuses the validators
1768    // already in memory and skips the L1 fetch (the mock fetcher
1769    // points at a non-existent RPC endpoint and would fail). This
1770    // still drives the inner.write block we want to verify.
1771    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1772    async fn add_epoch_root_is_atomic() {
1773        let header = build_epoch_root_header().await;
1774        let target = EpochNumber::new(3);
1775
1776        fn snapshot(c: &EpochCommittees, e: EpochNumber) -> (bool, bool) {
1777            let inner = c.inner.read();
1778            let header_set = inner
1779                .snapshots
1780                .get(&e)
1781                .map(|s| s.inner.committee.header.is_some())
1782                .unwrap_or(false);
1783            let all_validators_present = inner.all_validators.contains_key(&e);
1784            (header_set, all_validators_present)
1785        }
1786
1787        // Each round is one race attempt against `add_epoch_root`. Loop
1788        // for `TEST_DURATION` to widen the window for catching a torn
1789        // observation across the inner.write block.
1790        let test_start = tokio::time::Instant::now();
1791        let mut round: u64 = 0;
1792        while test_start.elapsed() < TEST_DURATION {
1793            let committees = build_committees(4);
1794
1795            // Pre-populate the snapshot for `target` with block_reward and
1796            // stake_table_hash but no header. This lands `add_epoch_root`
1797            // on the second match arm (no L1 fetch) but still drives
1798            // the inner.write() mutation.
1799            {
1800                let mut inner = committees.inner.write();
1801                let template = inner
1802                    .epoch_committee(EpochNumber::genesis())
1803                    .expect("genesis committee exists");
1804                let prefilled = EpochCommittee {
1805                    block_reward: Some(RewardAmount::default()),
1806                    stake_table_hash: Some(StakeTableState::default().commit()),
1807                    header: None,
1808                    eligible_leaders: template.eligible_leaders.clone(),
1809                    stake_table: template.stake_table.clone(),
1810                    validators: template.validators.clone(),
1811                    address_mapping: template.address_mapping.clone(),
1812                };
1813                inner.put_epoch_committee(target, Arc::new(prefilled));
1814            }
1815
1816            let stop = Arc::new(AtomicBool::new(false));
1817            let post = Arc::new(AtomicUsize::new(0));
1818
1819            let reader = {
1820                let c = committees.clone();
1821                let stop = Arc::clone(&stop);
1822                let post = Arc::clone(&post);
1823                tokio::spawn(async move {
1824                    while !stop.load(Ordering::Relaxed) {
1825                        match snapshot(&c, target) {
1826                            (false, false) => {}, // pre-state
1827                            (true, true) => {
1828                                post.fetch_add(1, Ordering::Relaxed);
1829                            },
1830                            torn => panic!(
1831                                "round {round}: torn snapshot for epoch {target}: header_set={}, \
1832                                 all_validators_present={}",
1833                                torn.0, torn.1,
1834                            ),
1835                        }
1836                        tokio::task::yield_now().await;
1837                    }
1838                })
1839            };
1840
1841            // Brief warmup so the reader is in its loop before the
1842            // mutation lands.
1843            tokio::time::sleep(Duration::from_millis(2)).await;
1844            committees
1845                .add_epoch_root(header.clone())
1846                .await
1847                .expect("add_epoch_root should succeed for the prefilled state");
1848
1849            let deadline = tokio::time::Instant::now() + Duration::from_millis(200);
1850            while tokio::time::Instant::now() < deadline && post.load(Ordering::Relaxed) == 0 {
1851                tokio::task::yield_now().await;
1852            }
1853            stop.store(true, Ordering::Relaxed);
1854            reader.await.expect("reader panicked");
1855            assert!(
1856                post.load(Ordering::Relaxed) > 0,
1857                "round {round}: reader never observed post-state",
1858            );
1859            round += 1;
1860        }
1861        assert!(round > 0, "test loop never executed a round");
1862    }
1863
1864    // `add_da_committee` updates the per-epoch `da_committees` map and
1865    // must retroactively rebuild every loaded snapshot whose epoch
1866    // resolves to the new committee. Without the rebuild, snapshots
1867    // baked the old DA at construction time and reads would silently
1868    // observe stale data until `add_epoch_root` rebuilt them.
1869    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1870    async fn add_da_committee_rebuilds_affected_snapshots() {
1871        fn da_keys(c: &EpochCommittees, e: EpochNumber) -> Vec<PubKey> {
1872            c.snapshot(e)
1873                .expect("snapshot")
1874                .da_stake_table()
1875                .map(|p| PubKey::public_key(&p.stake_table_entry))
1876                .collect()
1877        }
1878
1879        // `EpochNumber::genesis()` is 1, so `new_stake` seeds snapshots
1880        // for epochs 1 and 2. `set_first_epoch(2, ...)` then rebuilds
1881        // snapshots for epochs 2 and 3, all with the bootstrap DA.
1882        let committees = build_committees(4);
1883        committees.set_first_epoch(EpochNumber::new(2), [0u8; 32]);
1884
1885        let initial_e2 = da_keys(&committees, EpochNumber::new(2));
1886        let initial_e3 = da_keys(&committees, EpochNumber::new(3));
1887
1888        let new_da: Vec<PeerConfig<SeqTypes>> = (0..2)
1889            .map(|i| {
1890                ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1891                    [123u8; 32],
1892                    i,
1893                    U256::from(50),
1894                    true,
1895                )
1896                .public_config()
1897            })
1898            .collect();
1899        let new_da_keys: Vec<PubKey> = new_da
1900            .iter()
1901            .map(|p| PubKey::public_key(&p.stake_table_entry))
1902            .collect();
1903        assert_ne!(
1904            new_da_keys, initial_e2,
1905            "test setup: new DA must differ from initial"
1906        );
1907
1908        // Apply the new DA starting at epoch 2. Snapshot(2) and
1909        // snapshot(3) were seeded with the bootstrap DA; both must
1910        // observe the new DA after this call. Snapshot(1) lies before
1911        // `first_epoch=2`, so it stays on the bootstrap DA.
1912        committees.add_da_committee(EpochNumber::new(2), new_da);
1913
1914        assert_eq!(
1915            da_keys(&committees, EpochNumber::new(2)),
1916            new_da_keys,
1917            "snapshot(2) must reflect the new DA",
1918        );
1919        assert_eq!(
1920            da_keys(&committees, EpochNumber::new(3)),
1921            new_da_keys,
1922            "snapshot(3) must reflect the new DA",
1923        );
1924        assert_eq!(
1925            da_keys(&committees, EpochNumber::new(1)),
1926            initial_e2,
1927            "snapshot(1) lies before first_epoch=2 and must keep the bootstrap DA",
1928        );
1929        // Sanity: the original epoch-2 and epoch-3 snapshots both used
1930        // the bootstrap DA before the call. If a future change to
1931        // `set_first_epoch` makes these diverge, this assertion will
1932        // catch it so the test setup can be revisited.
1933        assert_eq!(initial_e2, initial_e3);
1934    }
1935
1936    // A `add_da_committee` call with a `first_epoch` greater than every
1937    // entry in an existing layered map must only rebuild snapshots in
1938    // its own range, not earlier ranges owned by other committees.
1939    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1940    async fn add_da_committee_layered_does_not_rebuild_earlier_ranges() {
1941        fn da_keys(c: &EpochCommittees, e: EpochNumber) -> Vec<PubKey> {
1942            c.snapshot(e)
1943                .expect("snapshot")
1944                .da_stake_table()
1945                .map(|p| PubKey::public_key(&p.stake_table_entry))
1946                .collect()
1947        }
1948
1949        let committees = build_committees(4);
1950        committees.set_first_epoch(EpochNumber::new(1), [0u8; 32]);
1951
1952        // Pre-populate snapshots for epochs 2..6 (set_first_epoch only
1953        // covers 1 and 2, but the range query needs more epochs to
1954        // exercise the layered case).
1955        {
1956            let mut inner = committees.inner.write();
1957            let template = inner
1958                .epoch_committee(EpochNumber::genesis())
1959                .expect("genesis committee exists")
1960                .clone();
1961            for e in 3..6 {
1962                inner.put_epoch_committee(EpochNumber::new(e), template.clone());
1963            }
1964        }
1965
1966        let da_b: Vec<PeerConfig<SeqTypes>> = (0..2)
1967            .map(|i| {
1968                ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1969                    [200u8; 32],
1970                    i,
1971                    U256::from(50),
1972                    true,
1973                )
1974                .public_config()
1975            })
1976            .collect();
1977        let da_b_keys: Vec<PubKey> = da_b
1978            .iter()
1979            .map(|p| PubKey::public_key(&p.stake_table_entry))
1980            .collect();
1981
1982        let da_c: Vec<PeerConfig<SeqTypes>> = (0..2)
1983            .map(|i| {
1984                ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1985                    [201u8; 32],
1986                    i,
1987                    U256::from(50),
1988                    true,
1989                )
1990                .public_config()
1991            })
1992            .collect();
1993        let da_c_keys: Vec<PubKey> = da_c
1994            .iter()
1995            .map(|p| PubKey::public_key(&p.stake_table_entry))
1996            .collect();
1997
1998        // Insert C first at epoch 5, then B at epoch 3. Range [3, 5)
1999        // must be rebuilt with B; [5, ∞) must keep C.
2000        committees.add_da_committee(EpochNumber::new(5), da_c);
2001        committees.add_da_committee(EpochNumber::new(3), da_b);
2002
2003        assert_eq!(da_keys(&committees, EpochNumber::new(3)), da_b_keys);
2004        assert_eq!(da_keys(&committees, EpochNumber::new(4)), da_b_keys);
2005        assert_eq!(
2006            da_keys(&committees, EpochNumber::new(5)),
2007            da_c_keys,
2008            "epoch 5 must keep C — it is outside B's range",
2009        );
2010    }
2011}