1use std::{
2 collections::{BTreeMap, HashMap},
3 ops::Bound,
4 sync::Arc,
5};
6
7use alloy::primitives::{Address, U256};
8use anyhow::{Context, bail};
9use async_lock::Mutex as AsyncMutex;
10use committable::Commitment;
11use hotshot::types::{BLSPubKey, SignatureKey as _};
12use hotshot_types::{
13 PeerConfig, PeerConnectInfo,
14 data::{BlockNumber, EpochNumber, ViewNumber},
15 drb::{
16 DrbResult,
17 election::{RandomizedCommittee, generate_stake_cdf, select_randomized_leader},
18 },
19 epoch_membership::EpochMembershipCoordinator,
20 stake_table::{HSStakeTable, StakeTableEntry},
21 traits::{
22 block_contents::BlockHeader,
23 election::{Membership, MembershipSnapshot, NonEpochMembershipSnapshot},
24 signature_key::StakeTableEntryType,
25 },
26 utils::{
27 epoch_from_block_number, is_epoch_root, root_block_in_epoch, transition_block_for_epoch,
28 },
29};
30use indexmap::IndexMap;
31use parking_lot::RwLock;
32use thiserror::Error;
33use tracing::{debug, error, info, warn};
34use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
35
36use super::{
37 AuthenticatedValidatorMap, RegisteredValidatorMap, StakeTableHash, StakeTableState,
38 compute_block_reward,
39};
40use crate::{
41 Header, Leaf2, PubKey, SeqTypes,
42 traits::StateCatchup,
43 v0_3::{ASSUMED_BLOCK_TIME_SECONDS, AuthenticatedValidator, Fetcher, RewardAmount},
44};
45
46#[derive(Clone, Debug)]
48pub struct EpochCommittees {
49 inner: Arc<RwLock<Inner>>,
50 fetcher: Arc<Fetcher>,
51 update_fixed_block_reward_lock: Arc<AsyncMutex<()>>,
52 epoch_height: BlockNumber,
53}
54
55#[derive(Debug)]
56struct Inner {
57 non_epoch_snapshot: NonEpochSnapshot,
62
63 snapshots: BTreeMap<EpochNumber, EpochSnapshot>,
67
68 all_validators: BTreeMap<EpochNumber, RegisteredValidatorMap>,
70
71 da_committees: BTreeMap<EpochNumber, Arc<DaCommittee>>,
75
76 first_epoch: Option<EpochNumber>,
77
78 fixed_block_reward: Option<RewardAmount>,
82}
83
84#[derive(Clone, Debug)]
85struct DaCommittee {
86 committee: Vec<PeerConfig<SeqTypes>>,
87 indexed_committee: HashMap<PubKey, PeerConfig<SeqTypes>>,
88}
89
90#[derive(Debug)]
92struct NonEpochCommittee {
93 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
98
99 stake_table: Vec<PeerConfig<SeqTypes>>,
101
102 indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
104}
105
106#[derive(Debug)]
108struct EpochCommittee {
109 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
114 stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
116 validators: AuthenticatedValidatorMap,
117 address_mapping: HashMap<BLSPubKey, Address>,
118 block_reward: Option<RewardAmount>,
119 stake_table_hash: Option<StakeTableHash>,
120 header: Option<Header>,
121}
122
123impl EpochCommittee {
124 fn new(
125 validators: AuthenticatedValidatorMap,
126 block_reward: Option<RewardAmount>,
127 hash: Option<StakeTableHash>,
128 header: Option<Header>,
129 ) -> Self {
130 let mut address_mapping = HashMap::new();
131 let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
132 .values()
133 .map(|v| {
134 address_mapping.insert(v.stake_table_key, v.account);
135 (
136 v.stake_table_key,
137 PeerConfig {
138 stake_table_entry: BLSPubKey::stake_table_entry(
139 &v.stake_table_key,
140 v.stake,
141 ),
142 state_ver_key: v.state_ver_key.clone(),
143 connect_info: v.x25519_key.and_then(|p| {
144 let a = v.p2p_addr.clone()?;
145 Some(PeerConnectInfo {
146 x25519_key: p,
147 p2p_addr: a,
148 })
149 }),
150 },
151 )
152 })
153 .collect();
154
155 let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
156 stake_table.iter().map(|(_, l)| l.clone()).collect();
157
158 Self {
159 eligible_leaders,
160 stake_table,
161 validators,
162 address_mapping,
163 block_reward,
164 stake_table_hash: hash,
165 header,
166 }
167 }
168}
169
170impl EpochCommittees {
171 pub fn epoch_height(&self) -> BlockNumber {
172 self.epoch_height
173 }
174
175 pub fn first_epoch(&self) -> Option<EpochNumber> {
176 self.inner.read().first_epoch
177 }
178
179 pub fn fetcher(&self) -> &Fetcher {
180 &self.fetcher
181 }
182
183 pub fn fixed_block_reward(&self) -> Option<RewardAmount> {
184 self.inner.read().fixed_block_reward
185 }
186
187 pub fn latest_peer_config(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
192 let inner = self.inner.read();
193 for snap in inner.snapshots.values().rev() {
194 if let Some(cfg) = snap.inner.committee.stake_table.get(key) {
195 return Some(cfg.clone());
196 }
197 }
198 inner
199 .non_epoch_snapshot
200 .inner
201 .committee
202 .indexed_stake_table
203 .get(key)
204 .cloned()
205 }
206
207 async fn fetch_and_update_fixed_block_reward(
212 &self,
213 epoch: EpochNumber,
214 ) -> anyhow::Result<RewardAmount> {
215 let _guard = self.update_fixed_block_reward_lock.lock().await;
217
218 #[allow(clippy::significant_drop_in_scrutinee)]
227 if let Some(reward) = self.inner.read().fixed_block_reward {
228 Ok(reward)
229 } else {
230 warn!(%epoch,
231 "Block reward is None. attempting to fetch it from L1",
232 );
233 let block_reward =
234 self.fetcher
235 .fetch_fixed_block_reward()
236 .await
237 .inspect_err(|err| {
238 error!(?epoch, ?err, "failed to fetch block_reward");
239 })?;
240 self.inner.write().fixed_block_reward = Some(block_reward);
241 Ok(block_reward)
242 }
243 }
244
245 async fn calculate_dynamic_block_reward(
252 &self,
253 epoch: EpochNumber,
254 header: &Header,
255 validators: &AuthenticatedValidatorMap,
256 ) -> anyhow::Result<Option<RewardAmount>> {
257 let epoch_height = self.epoch_height;
258 let current_epoch = epoch_from_block_number(header.height(), *epoch_height);
259 let previous_epoch = current_epoch
260 .checked_sub(1)
261 .context("underflow: cannot get previous epoch when current_epoch is 0")?;
262 debug!(?epoch, "previous_epoch={previous_epoch:?}");
263
264 let first_epoch = *self.first_epoch().context("first epoch is None")?;
265
266 if previous_epoch > first_epoch + 1 && self.snapshot(previous_epoch.into()).is_none() {
269 warn!(?previous_epoch, "missing stake table for previous epoch");
270 return Ok(None);
271 }
272
273 let previous_reward_distributed = header
274 .total_reward_distributed()
275 .context("Invalid block header: missing total_reward_distributed field")?;
276
277 let total_stake: U256 = validators.values().map(|v| v.stake).sum();
279 let initial_supply = *self.fetcher.initial_supply.read().await;
280 let initial_supply = match initial_supply {
281 Some(supply) => supply,
282 None => self.fetcher.fetch_and_update_initial_supply().await?,
283 };
284 let total_supply = initial_supply
285 .checked_add(previous_reward_distributed.0)
286 .context("initial_supply + previous_reward_distributed overflow")?;
287
288 let curr_ts = header.timestamp_millis_internal();
290 debug!(?epoch, "curr_ts={curr_ts:?}");
291
292 let average_block_time_ms = if previous_epoch <= first_epoch + 1 {
296 ASSUMED_BLOCK_TIME_SECONDS as u64 * 1000 } else {
298 let next_epoch = epoch
302 .checked_sub(1)
303 .context("underflow: cannot get next epoch when epoch is 0")?;
304 let prev_ts = match self.map_header(next_epoch, |h| h.timestamp_millis_internal()) {
305 Some(ts) => ts,
306 None => {
307 info!(
308 "Calculating rewards for epoch {}, we have no root leaf header for epoch \
309 - 1. Fetching from peers",
310 epoch
311 );
312
313 let root_height = header.height().checked_sub(*epoch_height).context(
314 "Epoch height is greater than block height. cannot compute previous epoch \
315 root height",
316 )?;
317
318 let prev_snapshot = self
319 .snapshot(EpochNumber::new(previous_epoch))
320 .context("Stake table not found")?;
321 let prev_stake_table =
322 HSStakeTable(prev_snapshot.stake_table().cloned().collect());
323 let success_threshold = prev_snapshot.success_threshold();
324
325 self.fetcher
326 .peers
327 .fetch_leaf(root_height, prev_stake_table, success_threshold)
328 .await
329 .context("Epoch root leaf not found")?
330 .block_header()
331 .timestamp_millis_internal()
332 },
333 };
334
335 let time_diff = curr_ts.checked_sub(prev_ts).context(
336 "Current timestamp is earlier than previous. underflow in block time calculation",
337 )?;
338
339 time_diff
340 .checked_div(*epoch_height)
341 .context("Epoch height is zero. cannot compute average block time")?
342 };
343 info!(?epoch, %total_supply, %total_stake, %average_block_time_ms,
344 "dynamic block reward parameters");
345
346 let block_reward =
347 compute_block_reward(epoch, total_supply, total_stake, average_block_time_ms)?;
348
349 Ok(Some(block_reward))
350 }
351
352 pub fn epoch_block_reward(&self, epoch: EpochNumber) -> Option<RewardAmount> {
354 self.inner
355 .read()
356 .epoch_committee(epoch)
357 .and_then(|committee| committee.block_reward)
358 }
359
360 pub fn get_validator_index(&self, epoch: EpochNumber, bls_key: &PubKey) -> Option<usize> {
365 self.inner
366 .read()
367 .epoch_committee(epoch)
368 .and_then(|committee| committee.stake_table.get_index_of(bls_key))
369 }
370
371 pub fn active_validators(&self, e: EpochNumber) -> anyhow::Result<AuthenticatedValidatorMap> {
372 self.inner.read().active_validators(e)
373 }
374
375 pub fn address(&self, e: EpochNumber, key: &BLSPubKey) -> anyhow::Result<Address> {
376 self.inner.read().address(e, key)
377 }
378
379 pub fn get_validator_config(
380 &self,
381 epoch: EpochNumber,
382 key: &BLSPubKey,
383 ) -> anyhow::Result<AuthenticatedValidator<BLSPubKey>> {
384 let inner = self.inner.read();
385 let address = inner.address(epoch, key)?;
386 let validators = inner.active_validators(epoch)?;
387 validators
388 .get(&address)
389 .context("validator not found")
390 .cloned()
391 }
392
393 pub fn new_stake<B: Into<BlockNumber>>(
395 committee_members: Vec<PeerConfig<SeqTypes>>,
398 da_members: Vec<PeerConfig<SeqTypes>>,
399 fixed_block_reward: Option<RewardAmount>,
400 fetcher: Fetcher,
401 epoch_height: B,
402 ) -> Self {
403 let stake_table: Vec<_> = committee_members
405 .iter()
406 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
407 .cloned()
408 .collect();
409
410 let eligible_leaders = stake_table.clone();
411 let da_members: Vec<_> = da_members
413 .iter()
414 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
415 .cloned()
416 .collect();
417
418 let indexed_stake_table: HashMap<PubKey, _> = stake_table
420 .iter()
421 .map(|peer_config| {
422 (
423 PubKey::public_key(&peer_config.stake_table_entry),
424 peer_config.clone(),
425 )
426 })
427 .collect();
428
429 let indexed_da_members: HashMap<PubKey, _> = da_members
431 .iter()
432 .map(|peer_config| {
433 (
434 PubKey::public_key(&peer_config.stake_table_entry),
435 peer_config.clone(),
436 )
437 })
438 .collect();
439
440 let da_committee = Arc::new(DaCommittee {
441 committee: da_members,
442 indexed_committee: indexed_da_members,
443 });
444
445 let members = Arc::new(NonEpochCommittee {
446 eligible_leaders,
447 stake_table,
448 indexed_stake_table,
449 });
450
451 let non_epoch_snapshot = NonEpochSnapshot::new(members.clone(), da_committee.clone());
452
453 let epoch_committee = Arc::new(EpochCommittee {
454 eligible_leaders: members.eligible_leaders.clone(),
455 stake_table: members
456 .stake_table
457 .iter()
458 .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
459 .collect(),
460 validators: Default::default(),
461 address_mapping: HashMap::new(),
462 block_reward: Default::default(),
463 stake_table_hash: None,
464 header: None,
465 });
466
467 let mut snapshots = BTreeMap::new();
468 snapshots.insert(
469 EpochNumber::genesis(),
470 EpochSnapshot::new(
471 EpochNumber::genesis(),
472 None,
473 epoch_committee.clone(),
474 None,
475 da_committee.clone(),
476 ),
477 );
478 snapshots.insert(
480 EpochNumber::genesis() + 1u64,
481 EpochSnapshot::new(
482 EpochNumber::genesis() + 1u64,
483 None,
484 epoch_committee,
485 None,
486 da_committee,
487 ),
488 );
489
490 Self {
491 inner: Arc::new(RwLock::new(Inner {
492 non_epoch_snapshot,
493 da_committees: BTreeMap::new(),
494 snapshots,
495 all_validators: BTreeMap::new(),
496 first_epoch: None,
497 fixed_block_reward,
498 })),
499 fetcher: Arc::new(fetcher),
500 update_fixed_block_reward_lock: Arc::new(AsyncMutex::new(())),
501 epoch_height: epoch_height.into(),
502 }
503 }
504
505 pub async fn reload_stake(&mut self, limit: u64) {
506 match self.fetcher.fetch_fixed_block_reward().await {
507 Ok(block_reward) => {
508 info!("Fetched block reward: {block_reward}");
509 self.inner.write().fixed_block_reward = Some(block_reward);
510 },
511 Err(err) => {
512 warn!("Failed to fetch the block reward when reloading the stake tables: {err}");
513 },
514 }
515
516 let loaded_stake = match self
518 .fetcher
519 .persistence
520 .lock()
521 .await
522 .load_latest_stake(limit)
523 .await
524 {
525 Ok(Some(loaded)) => loaded,
526 Ok(None) => {
527 warn!("No stake table history found in persistence!");
528 return;
529 },
530 Err(e) => {
531 error!("Failed to load stake table history from persistence: {e}");
532 return;
533 },
534 };
535
536 for (epoch, (validators, block_reward), stake_table_hash) in loaded_stake {
537 let committee = EpochCommittee::new(validators, block_reward, stake_table_hash, None);
538 self.inner
539 .write()
540 .put_epoch_committee(epoch, Arc::new(committee));
541 }
542 }
543
544 fn map_header<E, F, R>(&self, epoch: E, f: F) -> Option<R>
546 where
547 E: Into<EpochNumber>,
548 F: FnMut(&Header) -> R,
549 {
550 self.inner
551 .read()
552 .epoch_committee(epoch.into())
553 .and_then(|committee| committee.header.as_ref().map(f))
554 }
555
556 fn randomized_committee(
557 &self,
558 epoch: EpochNumber,
559 drb: DrbResult,
560 ) -> Option<RandomizedCommittee<StakeTableEntry<PubKey>>> {
561 let inner = self.inner.read();
562 let Some(raw_stake_table) = inner.epoch_committee(epoch) else {
563 error!(
564 "randomized_committee({epoch}, {drb:?}) was called, but we do not yet have the \
565 stake table for epoch {epoch}"
566 );
567 return None;
568 };
569
570 let leaders = raw_stake_table
571 .eligible_leaders
572 .clone()
573 .into_iter()
574 .map(|peer_config| peer_config.stake_table_entry)
575 .collect::<Vec<_>>();
576
577 Some(generate_stake_cdf(leaders, drb))
578 }
579}
580
581pub async fn fetch_and_calculate_block_reward(
591 coordinator: EpochMembershipCoordinator<SeqTypes>,
592 current_epoch: EpochNumber,
593) -> anyhow::Result<RewardAmount> {
594 let committee;
595 let first_epoch;
596 let fixed_block_reward;
597 {
598 let membership = coordinator.membership().inner.read();
599 fixed_block_reward = membership.fixed_block_reward;
600
601 committee = membership
602 .epoch_committee(current_epoch)
603 .context(format!("committee not found for epoch={current_epoch:?}"))?
604 .clone();
605
606 if let Some(reward) = committee.block_reward {
608 return Ok(reward);
609 }
610
611 first_epoch = membership.first_epoch.context(format!(
612 "First epoch not initialized (current_epoch={current_epoch})"
613 ))?;
614 }
615
616 if *current_epoch <= *first_epoch + 1 {
617 bail!(
618 "epoch is in first two epochs: current_epoch={current_epoch}, \
619 first_epoch={first_epoch}"
620 );
621 }
622
623 let header = match committee.header.clone() {
624 Some(header) => header,
625 None => {
626 let root_epoch = current_epoch.checked_sub(2).context(format!(
627 "Epoch calculation underflow (current_epoch={current_epoch})"
628 ))?;
629
630 info!(?root_epoch, "catchup epoch root header");
631
632 let leaf = coordinator
633 .membership()
634 .get_epoch_root(EpochNumber::new(root_epoch))
635 .await
636 .with_context(|| format!("Failed to get epoch root for root_epoch={root_epoch}"))?;
637 leaf.block_header().clone()
638 },
639 };
640
641 if header.version() <= EPOCH_VERSION {
642 return fixed_block_reward.context(format!(
643 "Fixed block reward not found for current_epoch={current_epoch}"
644 ));
645 }
646
647 let prev_epoch_u64 = current_epoch.checked_sub(1).context(format!(
648 "Underflow: cannot compute previous epoch when current_epoch={current_epoch}"
649 ))?;
650
651 let prev_epoch = EpochNumber::new(prev_epoch_u64);
652
653 if *prev_epoch > *first_epoch + 1
656 && let Err(err) = coordinator.stake_table_for_epoch(Some(prev_epoch))
657 {
658 info!("failed to get membership for epoch={prev_epoch:?}: {err:#}");
659
660 coordinator
661 .wait_for_catchup(prev_epoch)
662 .await
663 .context(format!("failed to catch up for epoch={prev_epoch}"))?;
664 }
665
666 coordinator
667 .membership()
668 .calculate_dynamic_block_reward(current_epoch, &header, &committee.validators)
669 .await
670 .with_context(|| {
671 format!("dynamic block reward calculation failed for epoch={current_epoch}")
672 })?
673 .with_context(|| format!("dynamic block reward returned None. epoch={current_epoch}"))
674}
675
676impl Membership<SeqTypes> for EpochCommittees {
677 type Error = EpochCommitteesError;
678 type Snapshot = EpochSnapshot;
679 type NonEpochSnapshot = NonEpochSnapshot;
680
681 fn snapshot(&self, epoch: EpochNumber) -> Option<Self::Snapshot> {
682 self.inner.read().snapshots.get(&epoch).cloned()
683 }
684
685 fn non_epoch_snapshot(&self) -> Self::NonEpochSnapshot {
686 self.inner.read().non_epoch_snapshot.clone()
687 }
688
689 async fn add_epoch_root(&self, block_header: Header) -> Result<(), Self::Error> {
693 let block_number = block_header.block_number();
694
695 let epoch_height = *self.epoch_height;
696
697 let epoch = EpochNumber::new(epoch_from_block_number(block_number, epoch_height) + 2);
698
699 info!(?epoch, "adding epoch root. height={:?}", block_number);
700
701 if !is_epoch_root(block_number, epoch_height) {
702 error!(
703 "`add_epoch_root` was called with a block header that was not the root block for \
704 an epoch. This should never happen. Header:\n\n{block_header:?}"
705 );
706 return Err(Self::Error::NoRootBlock(block_number.into()));
707 }
708
709 let version = block_header.version();
710 self.fetcher
712 .update_chain_config(&block_header)
713 .await
714 .map_err(Self::Error::Fetcher)?;
715
716 let mut block_reward = None;
717 if version == EPOCH_VERSION {
720 let reward = self
721 .fetch_and_update_fixed_block_reward(epoch)
722 .await
723 .map_err(Self::Error::Fetcher)?;
724 block_reward = Some(reward);
725 }
726
727 let epoch_committee = self.inner.read().epoch_committee(epoch).cloned();
728
729 let (active_validators, all_validators, stake_table_hash) = match epoch_committee {
738 Some(committee)
739 if committee.block_reward.is_some()
740 && committee.header.is_some()
741 && committee.stake_table_hash.is_some() =>
742 {
743 info!(
744 ?epoch,
745 "committee already has block reward, header, and stake table hash; skipping \
746 add_epoch_root"
747 );
748 return Ok(());
749 },
750
751 Some(committee) => {
752 if let Some(reward) = committee.block_reward {
753 block_reward = Some(reward);
754 }
755
756 if let Some(hash) = committee.stake_table_hash {
757 (committee.validators.clone(), Default::default(), Some(hash))
758 } else {
759 info!(
761 "Stake table hash missing for epoch {epoch}. recalculating by fetching \
762 from l1."
763 );
764 let set = self
765 .fetcher
766 .fetch(epoch, &block_header)
767 .await
768 .map_err(Self::Error::Fetcher)?;
769 (
770 set.active_validators,
771 set.all_validators,
772 set.stake_table_hash,
773 )
774 }
775 },
776
777 None => {
778 info!("Stake table missing for epoch {epoch}. Fetching from L1.");
779 let set = self
780 .fetcher
781 .fetch(epoch, &block_header)
782 .await
783 .map_err(Self::Error::Fetcher)?;
784 (
785 set.active_validators,
786 set.all_validators,
787 set.stake_table_hash,
788 )
789 },
790 };
791
792 if block_reward.is_none() && version >= DRB_AND_HEADER_UPGRADE_VERSION {
796 info!(?epoch, "calculating dynamic block reward");
797 let reward = self
798 .calculate_dynamic_block_reward(epoch, &block_header, &active_validators)
799 .await
800 .map_err(Self::Error::Reward)?;
801
802 info!(?epoch, "calculated dynamic block reward = {reward:?}");
803 block_reward = reward;
804 }
805
806 let committee = EpochCommittee::new(
807 active_validators.clone(),
808 block_reward,
809 stake_table_hash,
810 Some(block_header.clone()),
811 );
812
813 let previous_epoch;
814 let previous_committee;
815 let previous_validators;
816 {
817 let mut inner = self.inner.write();
818 inner.put_epoch_committee(epoch, Arc::new(committee));
819 previous_epoch = EpochNumber::new(epoch.saturating_sub(1));
822 previous_committee = inner.epoch_committee(previous_epoch).cloned();
823 inner.all_validators = inner.all_validators.split_off(&previous_epoch);
825 previous_validators = inner.all_validators.remove(&previous_epoch);
827 inner.all_validators.insert(epoch, all_validators.clone());
828 }
829
830 let persistence_lock = self.fetcher.persistence.lock().await;
831
832 let decided_hash = block_header.next_stake_table_hash();
833
834 if let Some(previous_committee) = previous_committee {
841 if decided_hash.is_none() || decided_hash == previous_committee.stake_table_hash {
842 if let Err(e) = persistence_lock
843 .store_stake(
844 previous_epoch,
845 previous_committee.validators.clone(),
846 previous_committee.block_reward,
847 previous_committee.stake_table_hash,
848 )
849 .await
850 {
851 error!(
852 ?e,
853 ?previous_epoch,
854 "`add_epoch_root`, error storing stake table"
855 );
856 }
857
858 if let Some(previous_validators) = previous_validators
859 && let Err(e) = persistence_lock
860 .store_all_validators(previous_epoch, previous_validators)
861 .await
862 {
863 error!(?e, ?epoch, "`add_epoch_root`, error storing all validators");
864 }
865 } else {
866 panic!(
867 "The decided block header's `next_stake_table_hash` does not match the hash \
868 of the stake table we have. This is an unrecoverable error likely due to \
869 issues with your L1 RPC provider. Decided:\n\n{:?}Actual:\n\n{:?}",
870 decided_hash, previous_committee.stake_table_hash
871 );
872 }
873 }
874
875 Ok(())
876 }
877
878 async fn get_epoch_root(&self, epoch: EpochNumber) -> Result<Leaf2, Self::Error> {
879 let block_height = root_block_in_epoch(*epoch, *self.epoch_height());
880 let peers = self.fetcher.peers.clone();
881 let snapshot = self
882 .snapshot(epoch)
883 .ok_or_else(|| Self::Error::Message(format!("no committee for epoch={epoch}")))?;
884 let stake_table = HSStakeTable(snapshot.stake_table().cloned().collect());
885 let success_threshold = snapshot.success_threshold();
886 let leaf: Leaf2 = peers
887 .fetch_leaf(block_height, stake_table, success_threshold)
888 .await
889 .map_err(Self::Error::Catchup)?;
890 Ok(leaf)
891 }
892
893 async fn get_epoch_drb(&self, epoch: EpochNumber) -> Result<DrbResult, Self::Error> {
894 let peers = self.fetcher.peers.clone();
895
896 if let Some(snap) = self.snapshot(epoch)
898 && let Some(rand) = &snap.inner.randomized
899 {
900 return Ok(rand.drb_result());
901 }
902
903 let previous_epoch = match epoch.checked_sub(1) {
905 Some(epoch) => EpochNumber::new(epoch),
906 None => {
907 return self
908 .snapshot(epoch)
909 .and_then(|s| s.inner.randomized.as_ref().map(|r| r.drb_result()))
910 .ok_or_else(|| {
911 Self::Error::Message(format!(
912 "Missing randomized committee for epoch {epoch}"
913 ))
914 });
915 },
916 };
917
918 let prev_snapshot = self.snapshot(previous_epoch).ok_or_else(|| {
919 Self::Error::Message(format!("no committee for previous_epoch={previous_epoch}"))
920 })?;
921 let stake_table = HSStakeTable(prev_snapshot.stake_table().cloned().collect());
922 let success_threshold = prev_snapshot.success_threshold();
923
924 let block_height = transition_block_for_epoch(*previous_epoch, *self.epoch_height());
925
926 debug!(
927 "Getting DRB for epoch {}, block height {}",
928 epoch, block_height
929 );
930 let drb_leaf = peers
931 .try_fetch_leaf(1, block_height, stake_table, success_threshold)
932 .await
933 .map_err(Self::Error::Catchup)?;
934
935 let Some(drb) = drb_leaf.next_drb_result else {
936 error!(
937 "We received a leaf that should contain a DRB result, but the DRB result is \
938 missing: {:?}",
939 drb_leaf
940 );
941
942 return Err(Self::Error::Message(
943 "DRB leaf is missing the DRB result.".to_string(),
944 ));
945 };
946
947 Ok(drb)
948 }
949
950 fn add_drb_result(&self, epoch: EpochNumber, drb: DrbResult) {
951 info!("Adding DRB result {drb:?} to epoch {epoch}");
952 if let Some(committee) = self.randomized_committee(epoch, drb) {
953 self.inner
954 .write()
955 .put_randomized_committee(epoch, Arc::new(committee));
956 }
957 }
958
959 fn set_first_epoch(&self, epoch: EpochNumber, initial_drb_result: DrbResult) {
960 let rand_comm = Arc::new(
961 self.randomized_committee(EpochNumber::genesis(), initial_drb_result)
962 .expect("committee exist at genesis"),
963 );
964
965 let mut inner = self.inner.write();
966 inner.first_epoch = Some(epoch);
967
968 let epoch_committee = inner
969 .epoch_committee(EpochNumber::genesis())
970 .expect("committee exists at genesis")
971 .clone();
972
973 inner.put_epoch_committee(epoch, epoch_committee.clone());
976 inner.put_randomized_committee(epoch, rand_comm.clone());
977 inner.put_epoch_committee(epoch + 1, epoch_committee);
978 inner.put_randomized_committee(epoch + 1, rand_comm);
979 }
980
981 fn first_epoch(&self) -> Option<EpochNumber> {
982 self.inner.read().first_epoch
983 }
984
985 fn highest_known_epoch(&self) -> Option<EpochNumber> {
986 self.inner.read().snapshots.keys().max().copied()
987 }
988
989 fn add_da_committee(&self, first_epoch: EpochNumber, committee: Vec<PeerConfig<SeqTypes>>) {
990 let indexed_committee: HashMap<PubKey, _> = committee
991 .iter()
992 .map(|peer_config| {
993 (
994 PubKey::public_key(&peer_config.stake_table_entry),
995 peer_config.clone(),
996 )
997 })
998 .collect();
999
1000 let da_committee = Arc::new(DaCommittee {
1001 committee,
1002 indexed_committee,
1003 });
1004
1005 let mut inner = self.inner.write();
1006 inner
1007 .da_committees
1008 .insert(first_epoch, da_committee.clone());
1009
1010 let upper = inner
1016 .da_committees
1017 .range((Bound::Excluded(first_epoch), Bound::Unbounded))
1018 .next()
1019 .map(|(k, _)| *k);
1020
1021 let range = if let Some(u) = upper {
1022 (Bound::Included(first_epoch), Bound::Excluded(u))
1023 } else {
1024 (Bound::Included(first_epoch), Bound::Unbounded)
1025 };
1026
1027 let affected: Vec<EpochNumber> = inner.snapshots.range(range).map(|(k, _)| *k).collect();
1028 let first_epoch_field = inner.first_epoch;
1029
1030 for epoch in affected {
1031 let Some(existing) = inner.snapshots.get(&epoch) else {
1032 continue;
1033 };
1034 let new_snapshot = EpochSnapshot::new(
1035 epoch,
1036 first_epoch_field,
1037 existing.inner.committee.clone(),
1038 existing.inner.randomized.clone(),
1039 da_committee.clone(),
1040 );
1041 inner.snapshots.insert(epoch, new_snapshot);
1042 }
1043 }
1044}
1045
1046#[derive(Error, Debug)]
1047pub enum EpochCommitteesError {
1048 #[error("could not lookup leader")]
1049 LeaderLookupError,
1050
1051 #[error("block {0} is not the root block for an epoch")]
1052 NoRootBlock(BlockNumber),
1053
1054 #[error("fetcher error: {0}")]
1055 Fetcher(#[source] anyhow::Error),
1056
1057 #[error("{0}")]
1058 Message(String),
1059
1060 #[error("state catchup error: {0}")]
1061 Catchup(#[source] anyhow::Error),
1062
1063 #[error("reward calculation error: {0}")]
1064 Reward(#[source] anyhow::Error),
1065}
1066
1067impl Inner {
1068 fn resolve_da_committee(&self, epoch: Option<EpochNumber>) -> Arc<DaCommittee> {
1071 if let Some(e) = epoch {
1072 self.da_committees
1074 .range((Bound::Included(0.into()), Bound::Included(e)))
1075 .last()
1076 .map(|(_, committee)| committee.clone())
1077 .unwrap_or_else(|| self.non_epoch_snapshot.inner.da_committee.clone())
1078 } else {
1079 self.non_epoch_snapshot.inner.da_committee.clone()
1080 }
1081 }
1082
1083 fn epoch_committee(&self, e: EpochNumber) -> Option<&Arc<EpochCommittee>> {
1085 self.snapshots.get(&e).map(|s| &s.inner.committee)
1086 }
1087
1088 fn address(&self, e: EpochNumber, key: &BLSPubKey) -> anyhow::Result<Address> {
1089 self.epoch_committee(e)
1090 .context("state for found")?
1091 .address_mapping
1092 .get(key)
1093 .copied()
1094 .context(format!(
1095 "failed to get ethereum address for bls key {key}. epoch={e}"
1096 ))
1097 }
1098
1099 fn active_validators(&self, e: EpochNumber) -> anyhow::Result<AuthenticatedValidatorMap> {
1100 Ok(self
1101 .epoch_committee(e)
1102 .context("state not found")?
1103 .validators
1104 .clone())
1105 }
1106
1107 fn put_epoch_committee(&mut self, epoch: EpochNumber, committee: Arc<EpochCommittee>) {
1110 let randomized = self
1111 .snapshots
1112 .get(&epoch)
1113 .and_then(|s| s.inner.randomized.clone());
1114 let da_committee = self.resolve_da_committee(Some(epoch));
1115 let first_epoch = self.first_epoch;
1116 self.snapshots.insert(
1117 epoch,
1118 EpochSnapshot::new(epoch, first_epoch, committee, randomized, da_committee),
1119 );
1120 }
1121
1122 fn put_randomized_committee(
1126 &mut self,
1127 epoch: EpochNumber,
1128 randomized: Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>,
1129 ) {
1130 let Some(existing) = self.snapshots.get(&epoch).cloned() else {
1131 return;
1132 };
1133 let committee = existing.inner.committee.clone();
1134 let da_committee = existing.inner.da_committee.clone();
1135 let first_epoch = self.first_epoch;
1136 self.snapshots.insert(
1137 epoch,
1138 EpochSnapshot::new(
1139 epoch,
1140 first_epoch,
1141 committee,
1142 Some(randomized),
1143 da_committee,
1144 ),
1145 );
1146 }
1147}
1148
1149#[derive(Clone, Debug)]
1153pub struct EpochSnapshot {
1154 inner: Arc<EpochSnapshotInner>,
1155}
1156
1157#[derive(Debug)]
1158struct EpochSnapshotInner {
1159 epoch: EpochNumber,
1160 first_epoch: Option<EpochNumber>,
1161 committee: Arc<EpochCommittee>,
1162 randomized: Option<Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>>,
1163 da_committee: Arc<DaCommittee>,
1164}
1165
1166impl EpochSnapshot {
1167 fn new(
1168 epoch: EpochNumber,
1169 first_epoch: Option<EpochNumber>,
1170 committee: Arc<EpochCommittee>,
1171 randomized: Option<Arc<RandomizedCommittee<StakeTableEntry<PubKey>>>>,
1172 da_committee: Arc<DaCommittee>,
1173 ) -> Self {
1174 Self {
1175 inner: Arc::new(EpochSnapshotInner {
1176 epoch,
1177 first_epoch,
1178 committee,
1179 randomized,
1180 da_committee,
1181 }),
1182 }
1183 }
1184}
1185
1186impl EpochSnapshot {
1187 pub fn validator_index(&self, key: &PubKey) -> Option<usize> {
1189 self.inner.committee.stake_table.get_index_of(key)
1190 }
1191
1192 pub fn validator_config(
1194 &self,
1195 key: &BLSPubKey,
1196 ) -> anyhow::Result<&AuthenticatedValidator<BLSPubKey>> {
1197 let address = self
1198 .inner
1199 .committee
1200 .address_mapping
1201 .get(key)
1202 .context(format!(
1203 "failed to get ethereum address for bls key {key}. epoch={}",
1204 self.inner.epoch
1205 ))?;
1206 self.inner
1207 .committee
1208 .validators
1209 .get(address)
1210 .context("validator not found")
1211 }
1212
1213 pub fn epoch_block_reward(&self) -> Option<RewardAmount> {
1214 self.inner.committee.block_reward
1215 }
1216
1217 pub fn validators(&self) -> &AuthenticatedValidatorMap {
1218 &self.inner.committee.validators
1219 }
1220}
1221
1222impl MembershipSnapshot<SeqTypes> for EpochSnapshot {
1223 type Error = EpochCommitteesError;
1224 type StakeTableHash = StakeTableState;
1225
1226 fn epoch(&self) -> EpochNumber {
1227 self.inner.epoch
1228 }
1229
1230 fn first_epoch(&self) -> Option<EpochNumber> {
1231 self.inner.first_epoch
1232 }
1233
1234 fn has_drb(&self) -> bool {
1235 self.inner.randomized.is_some()
1236 }
1237
1238 fn stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send {
1239 self.inner.committee.stake_table.values()
1240 }
1241
1242 fn da_stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send {
1243 self.inner.da_committee.committee.iter()
1244 }
1245
1246 fn committee_members(&self, _: ViewNumber) -> impl ExactSizeIterator<Item = &PubKey> + Send {
1247 self.inner.committee.stake_table.keys()
1248 }
1249
1250 fn da_committee_members(&self, _: ViewNumber) -> impl ExactSizeIterator<Item = &PubKey> + Send {
1251 self.inner.da_committee.indexed_committee.keys()
1252 }
1253
1254 fn stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1255 self.inner.committee.stake_table.get(key).cloned()
1256 }
1257
1258 fn da_stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1259 self.inner.da_committee.indexed_committee.get(key).cloned()
1260 }
1261
1262 fn has_stake(&self, key: &PubKey) -> bool {
1263 self.stake(key)
1264 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1265 .unwrap_or_default()
1266 }
1267
1268 fn has_da_stake(&self, key: &PubKey) -> bool {
1269 self.da_stake(key)
1270 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1271 .unwrap_or_default()
1272 }
1273
1274 fn lookup_leader(&self, view: ViewNumber) -> Result<PubKey, Self::Error> {
1281 let inner = &self.inner;
1282 let Some(first_epoch) = inner.first_epoch else {
1283 error!(
1284 "leader requested for epoch {} but first_epoch is unset",
1285 inner.epoch,
1286 );
1287 return Err(EpochCommitteesError::LeaderLookupError);
1288 };
1289 if inner.epoch < first_epoch {
1290 error!(
1291 "leader requested for epoch {} before first epoch {first_epoch}",
1292 inner.epoch,
1293 );
1294 return Err(EpochCommitteesError::LeaderLookupError);
1295 }
1296 let Some(rand) = inner.randomized.as_deref() else {
1297 error!(
1298 "missing randomized committee for epoch {} in snapshot",
1299 inner.epoch,
1300 );
1301 return Err(EpochCommitteesError::LeaderLookupError);
1302 };
1303 Ok(PubKey::public_key(&select_randomized_leader(rand, *view)))
1304 }
1305
1306 fn stake_table_hash(&self) -> Option<Commitment<Self::StakeTableHash>> {
1307 self.inner.committee.stake_table_hash
1308 }
1309}
1310
1311#[derive(Clone, Debug)]
1315pub struct NonEpochSnapshot {
1316 inner: Arc<NonEpochSnapshotInner>,
1317}
1318
1319#[derive(Debug)]
1320struct NonEpochSnapshotInner {
1321 committee: Arc<NonEpochCommittee>,
1322 da_committee: Arc<DaCommittee>,
1323}
1324
1325impl NonEpochSnapshot {
1326 fn new(committee: Arc<NonEpochCommittee>, da_committee: Arc<DaCommittee>) -> Self {
1327 Self {
1328 inner: Arc::new(NonEpochSnapshotInner {
1329 committee,
1330 da_committee,
1331 }),
1332 }
1333 }
1334}
1335
1336impl NonEpochMembershipSnapshot<SeqTypes> for NonEpochSnapshot {
1337 type Error = EpochCommitteesError;
1338
1339 fn stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send + '_ {
1340 self.inner.committee.stake_table.iter()
1341 }
1342
1343 fn da_stake_table(&self) -> impl ExactSizeIterator<Item = &PeerConfig<SeqTypes>> + Send + '_ {
1344 self.inner.da_committee.committee.iter()
1345 }
1346
1347 fn committee_members(
1348 &self,
1349 _: ViewNumber,
1350 ) -> impl ExactSizeIterator<Item = &PubKey> + Send + '_ {
1351 self.inner.committee.indexed_stake_table.keys()
1352 }
1353
1354 fn da_committee_members(
1355 &self,
1356 _: ViewNumber,
1357 ) -> impl ExactSizeIterator<Item = &PubKey> + Send + '_ {
1358 self.inner.da_committee.indexed_committee.keys()
1359 }
1360
1361 fn stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1362 self.inner.committee.indexed_stake_table.get(key).cloned()
1363 }
1364
1365 fn da_stake(&self, key: &PubKey) -> Option<PeerConfig<SeqTypes>> {
1366 self.inner.da_committee.indexed_committee.get(key).cloned()
1367 }
1368
1369 fn has_stake(&self, key: &PubKey) -> bool {
1370 self.stake(key)
1371 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1372 .unwrap_or_default()
1373 }
1374
1375 fn has_da_stake(&self, key: &PubKey) -> bool {
1376 self.da_stake(key)
1377 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1378 .unwrap_or_default()
1379 }
1380
1381 fn lookup_leader(&self, view: ViewNumber) -> Result<PubKey, Self::Error> {
1382 let leaders = &self.inner.committee.eligible_leaders;
1383 if leaders.is_empty() {
1384 return Err(EpochCommitteesError::LeaderLookupError);
1385 }
1386 let index = *view as usize % leaders.len();
1387 Ok(PubKey::public_key(&leaders[index].stake_table_entry))
1388 }
1389}
1390
1391#[cfg(test)]
1392mod tests {
1393 use std::sync::{
1394 Arc,
1395 atomic::{AtomicBool, AtomicUsize, Ordering},
1396 };
1397
1398 use committable::Committable;
1399 use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
1400 use hotshot_types::{
1401 ValidatorConfig,
1402 traits::{BlockPayload, block_contents::BlockHeader},
1403 };
1404 use tokio::{task::JoinSet, time::Duration};
1405
1406 use super::*;
1407 use crate::{NodeState, Payload, Transaction};
1408
1409 const TEST_DURATION: Duration = Duration::from_secs(5);
1413
1414 fn build_committees(num_peers: u64) -> EpochCommittees {
1415 let peers: Vec<PeerConfig<SeqTypes>> = (0..num_peers)
1416 .map(|i| {
1417 ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1418 [42u8; 32],
1419 i,
1420 U256::from(100),
1421 true,
1422 )
1423 .public_config()
1424 })
1425 .collect();
1426 EpochCommittees::new_stake(peers.clone(), peers, None, Fetcher::mock(), 100u64)
1427 }
1428
1429 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1443 async fn concurrent_reads_during_mutations() {
1444 let committees = build_committees(8);
1445 committees.set_first_epoch(EpochNumber::new(1), [0u8; 32]);
1446
1447 {
1452 let mut inner = committees.inner.write();
1453 let template = inner
1454 .epoch_committee(EpochNumber::genesis())
1455 .expect("genesis committee exists")
1456 .clone();
1457 for e in 2..6 {
1458 inner.put_epoch_committee(EpochNumber::new(e), template.clone());
1459 }
1460 }
1461
1462 let stop = Arc::new(AtomicBool::new(false));
1463 let mut tasks = JoinSet::new();
1464
1465 for _ in 0..8 {
1466 let c = committees.clone();
1467 let stop = Arc::clone(&stop);
1468 tasks.spawn(async move {
1469 let stable = EpochNumber::new(1);
1470 let mutating = EpochNumber::new(3);
1471 let view = ViewNumber::new(0);
1472 while !stop.load(Ordering::Relaxed) {
1473 let stable_snap = c.snapshot(stable).expect("stable snapshot");
1479 let len = stable_snap.stake_table().len();
1480 assert_eq!(len, stable_snap.total_nodes());
1481 let leader = stable_snap.lookup_leader(view).expect("leader");
1482 assert!(
1483 stable_snap.committee_members(view).any(|p| p == &leader),
1484 "leader {leader:?} not in committee_members for stable epoch",
1485 );
1486 assert_eq!(c.first_epoch(), Some(stable));
1487
1488 let _ = c.snapshot(mutating);
1494 if let Some(s) = c.snapshot(mutating) {
1495 let _ = s.lookup_leader(view);
1496 }
1497 tokio::task::yield_now().await;
1498 }
1499 });
1500 }
1501
1502 tasks.spawn({
1506 let c = committees.clone();
1507 let stop = Arc::clone(&stop);
1508 async move {
1509 let extra: Vec<PeerConfig<SeqTypes>> = (0..3)
1510 .map(|i| {
1511 ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1512 [99u8; 32],
1513 i,
1514 U256::from(50),
1515 true,
1516 )
1517 .public_config()
1518 })
1519 .collect();
1520 let mut i: u64 = 0;
1521 while !stop.load(Ordering::Relaxed) {
1522 c.add_drb_result(EpochNumber::new(2 + (i % 4)), [(i % 256) as u8; 32]);
1525 c.add_drb_result(EpochNumber::new(10_000 + i), [0xAB; 32]);
1528 if i.is_multiple_of(50) {
1529 c.add_da_committee(i.into(), extra.clone());
1530 }
1531 if i.is_multiple_of(16) {
1532 tokio::task::yield_now().await;
1533 }
1534 i += 1;
1535 }
1536 }
1537 });
1538
1539 tokio::time::sleep(TEST_DURATION).await;
1540 stop.store(true, Ordering::Relaxed);
1541 while let Some(res) = tasks.join_next().await {
1542 res.expect("task panicked");
1543 }
1544 }
1545
1546 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1558 async fn set_first_epoch_is_atomic() {
1559 fn snapshot(c: &EpochCommittees, e: EpochNumber) -> Snapshot {
1562 let inner = c.inner.read();
1563 Snapshot {
1564 first_epoch: inner.first_epoch,
1565 state_e: inner.snapshots.contains_key(&e),
1566 state_e1: inner.snapshots.contains_key(&(e + 1)),
1567 rand_e: inner
1568 .snapshots
1569 .get(&e)
1570 .is_some_and(|s| s.inner.randomized.is_some()),
1571 rand_e1: inner
1572 .snapshots
1573 .get(&(e + 1))
1574 .is_some_and(|s| s.inner.randomized.is_some()),
1575 }
1576 }
1577
1578 #[derive(Debug)]
1579 struct Snapshot {
1580 first_epoch: Option<EpochNumber>,
1581 state_e: bool,
1582 state_e1: bool,
1583 rand_e: bool,
1584 rand_e1: bool,
1585 }
1586
1587 let target = EpochNumber::new(10);
1588
1589 let test_start = tokio::time::Instant::now();
1594 let mut round: u64 = 0;
1595 while test_start.elapsed() < TEST_DURATION {
1596 let committees = build_committees(4);
1597 let stop = Arc::new(AtomicBool::new(false));
1598 let post_observations = Arc::new(AtomicUsize::new(0));
1599
1600 let reader = {
1601 let c = committees.clone();
1602 let stop = Arc::clone(&stop);
1603 let post = Arc::clone(&post_observations);
1604 tokio::spawn(async move {
1605 while !stop.load(Ordering::Relaxed) {
1606 let s = snapshot(&c, target);
1607 match s.first_epoch {
1608 None => assert!(
1609 !s.state_e && !s.state_e1 && !s.rand_e && !s.rand_e1,
1610 "torn snapshot: first_epoch=None but some target state present: \
1611 {s:?}",
1612 ),
1613 Some(e) => {
1614 assert_eq!(e, target, "only target is ever set");
1615 assert!(
1616 s.state_e && s.state_e1 && s.rand_e && s.rand_e1,
1617 "torn snapshot: first_epoch=Some but some target state \
1618 missing: {s:?}",
1619 );
1620 post.fetch_add(1, Ordering::Relaxed);
1621 },
1622 }
1623 tokio::task::yield_now().await;
1624 }
1625 })
1626 };
1627
1628 tokio::time::sleep(Duration::from_millis(2)).await;
1630 committees.set_first_epoch(target, [(round as u8) ^ 0xA5; 32]);
1631
1632 let deadline = tokio::time::Instant::now() + Duration::from_millis(200);
1635 while tokio::time::Instant::now() < deadline
1636 && post_observations.load(Ordering::Relaxed) == 0
1637 {
1638 tokio::task::yield_now().await;
1639 }
1640
1641 stop.store(true, Ordering::Relaxed);
1642 reader.await.expect("reader panicked");
1643 assert!(
1644 post_observations.load(Ordering::Relaxed) > 0,
1645 "round {round}: reader never observed post-set state",
1646 );
1647 round += 1;
1648 }
1649 assert!(round > 0, "test loop never executed a round");
1650 }
1651
1652 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1658 async fn concurrent_add_drb_result_same_epoch() {
1659 let committees = build_committees(4);
1660 let epoch = EpochNumber::new(1);
1661 committees.set_first_epoch(epoch, [0u8; 32]);
1664
1665 let stop = Arc::new(AtomicBool::new(false));
1666 let writes = Arc::new(AtomicUsize::new(0));
1667 let lookups = Arc::new(AtomicUsize::new(0));
1668
1669 let mut writers = JoinSet::new();
1670 let mut readers = JoinSet::new();
1671
1672 for _ in 0..4 {
1676 let c = committees.clone();
1677 let stop = Arc::clone(&stop);
1678 let lookups = Arc::clone(&lookups);
1679 readers.spawn(async move {
1680 let view = ViewNumber::new(0);
1681 while !stop.load(Ordering::Relaxed) {
1682 c.snapshot(epoch)
1683 .expect("snapshot")
1684 .lookup_leader(view)
1685 .expect("randomized committee must remain present once set");
1686 lookups.fetch_add(1, Ordering::Relaxed);
1687 tokio::task::yield_now().await;
1688 }
1689 });
1690 }
1691
1692 for tid in 0..8u8 {
1696 let c = committees.clone();
1697 let stop = Arc::clone(&stop);
1698 let writes = Arc::clone(&writes);
1699 writers.spawn(async move {
1700 let mut i: u64 = 0;
1701 while !stop.load(Ordering::Relaxed) {
1702 let mut drb = [tid; 32];
1703 drb[0] = (i & 0xFF) as u8;
1704 c.add_drb_result(epoch, drb);
1705 writes.fetch_add(1, Ordering::Relaxed);
1706 if i.is_multiple_of(16) {
1707 tokio::task::yield_now().await;
1708 }
1709 i += 1;
1710 }
1711 });
1712 }
1713
1714 tokio::time::sleep(TEST_DURATION).await;
1715 stop.store(true, Ordering::Relaxed);
1716 while let Some(res) = writers.join_next().await {
1717 res.expect("writer panicked");
1718 }
1719 while let Some(res) = readers.join_next().await {
1720 res.expect("reader panicked");
1721 }
1722
1723 assert!(writes.load(Ordering::Relaxed) > 0, "writers never advanced",);
1724 assert!(
1725 lookups.load(Ordering::Relaxed) > 0,
1726 "readers never observed the randomized committee",
1727 );
1728 let snap = committees
1729 .snapshot(epoch)
1730 .expect("randomized committee must survive concurrent writes");
1731 assert!(snap.has_drb(), "randomized committee must remain present");
1732 let view = ViewNumber::new(0);
1733 let _leader = snap
1734 .lookup_leader(view)
1735 .expect("lookup_leader succeeds when randomized committee is present");
1736 }
1737
1738 async fn build_epoch_root_header() -> Header {
1742 let instance = NodeState::mock_v2();
1743 let tx = Transaction::of_size(10);
1744 let (payload, _) = Payload::from_transactions([tx], &instance.genesis_state, &instance)
1745 .await
1746 .expect("payload");
1747 let metadata = payload.ns_table().clone();
1748 let header = Header::genesis(&instance, payload, &metadata, MOCK_UPGRADE.base);
1749 match header {
1750 Header::V2(mut h) => {
1751 h.height = 95;
1752 Header::V2(h)
1753 },
1754 other => panic!("expected V2 header from NodeState::mock_v2, got {other:?}"),
1755 }
1756 }
1757
1758 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1772 async fn add_epoch_root_is_atomic() {
1773 let header = build_epoch_root_header().await;
1774 let target = EpochNumber::new(3);
1775
1776 fn snapshot(c: &EpochCommittees, e: EpochNumber) -> (bool, bool) {
1777 let inner = c.inner.read();
1778 let header_set = inner
1779 .snapshots
1780 .get(&e)
1781 .map(|s| s.inner.committee.header.is_some())
1782 .unwrap_or(false);
1783 let all_validators_present = inner.all_validators.contains_key(&e);
1784 (header_set, all_validators_present)
1785 }
1786
1787 let test_start = tokio::time::Instant::now();
1791 let mut round: u64 = 0;
1792 while test_start.elapsed() < TEST_DURATION {
1793 let committees = build_committees(4);
1794
1795 {
1800 let mut inner = committees.inner.write();
1801 let template = inner
1802 .epoch_committee(EpochNumber::genesis())
1803 .expect("genesis committee exists");
1804 let prefilled = EpochCommittee {
1805 block_reward: Some(RewardAmount::default()),
1806 stake_table_hash: Some(StakeTableState::default().commit()),
1807 header: None,
1808 eligible_leaders: template.eligible_leaders.clone(),
1809 stake_table: template.stake_table.clone(),
1810 validators: template.validators.clone(),
1811 address_mapping: template.address_mapping.clone(),
1812 };
1813 inner.put_epoch_committee(target, Arc::new(prefilled));
1814 }
1815
1816 let stop = Arc::new(AtomicBool::new(false));
1817 let post = Arc::new(AtomicUsize::new(0));
1818
1819 let reader = {
1820 let c = committees.clone();
1821 let stop = Arc::clone(&stop);
1822 let post = Arc::clone(&post);
1823 tokio::spawn(async move {
1824 while !stop.load(Ordering::Relaxed) {
1825 match snapshot(&c, target) {
1826 (false, false) => {}, (true, true) => {
1828 post.fetch_add(1, Ordering::Relaxed);
1829 },
1830 torn => panic!(
1831 "round {round}: torn snapshot for epoch {target}: header_set={}, \
1832 all_validators_present={}",
1833 torn.0, torn.1,
1834 ),
1835 }
1836 tokio::task::yield_now().await;
1837 }
1838 })
1839 };
1840
1841 tokio::time::sleep(Duration::from_millis(2)).await;
1844 committees
1845 .add_epoch_root(header.clone())
1846 .await
1847 .expect("add_epoch_root should succeed for the prefilled state");
1848
1849 let deadline = tokio::time::Instant::now() + Duration::from_millis(200);
1850 while tokio::time::Instant::now() < deadline && post.load(Ordering::Relaxed) == 0 {
1851 tokio::task::yield_now().await;
1852 }
1853 stop.store(true, Ordering::Relaxed);
1854 reader.await.expect("reader panicked");
1855 assert!(
1856 post.load(Ordering::Relaxed) > 0,
1857 "round {round}: reader never observed post-state",
1858 );
1859 round += 1;
1860 }
1861 assert!(round > 0, "test loop never executed a round");
1862 }
1863
1864 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1870 async fn add_da_committee_rebuilds_affected_snapshots() {
1871 fn da_keys(c: &EpochCommittees, e: EpochNumber) -> Vec<PubKey> {
1872 c.snapshot(e)
1873 .expect("snapshot")
1874 .da_stake_table()
1875 .map(|p| PubKey::public_key(&p.stake_table_entry))
1876 .collect()
1877 }
1878
1879 let committees = build_committees(4);
1883 committees.set_first_epoch(EpochNumber::new(2), [0u8; 32]);
1884
1885 let initial_e2 = da_keys(&committees, EpochNumber::new(2));
1886 let initial_e3 = da_keys(&committees, EpochNumber::new(3));
1887
1888 let new_da: Vec<PeerConfig<SeqTypes>> = (0..2)
1889 .map(|i| {
1890 ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1891 [123u8; 32],
1892 i,
1893 U256::from(50),
1894 true,
1895 )
1896 .public_config()
1897 })
1898 .collect();
1899 let new_da_keys: Vec<PubKey> = new_da
1900 .iter()
1901 .map(|p| PubKey::public_key(&p.stake_table_entry))
1902 .collect();
1903 assert_ne!(
1904 new_da_keys, initial_e2,
1905 "test setup: new DA must differ from initial"
1906 );
1907
1908 committees.add_da_committee(EpochNumber::new(2), new_da);
1913
1914 assert_eq!(
1915 da_keys(&committees, EpochNumber::new(2)),
1916 new_da_keys,
1917 "snapshot(2) must reflect the new DA",
1918 );
1919 assert_eq!(
1920 da_keys(&committees, EpochNumber::new(3)),
1921 new_da_keys,
1922 "snapshot(3) must reflect the new DA",
1923 );
1924 assert_eq!(
1925 da_keys(&committees, EpochNumber::new(1)),
1926 initial_e2,
1927 "snapshot(1) lies before first_epoch=2 and must keep the bootstrap DA",
1928 );
1929 assert_eq!(initial_e2, initial_e3);
1934 }
1935
1936 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1940 async fn add_da_committee_layered_does_not_rebuild_earlier_ranges() {
1941 fn da_keys(c: &EpochCommittees, e: EpochNumber) -> Vec<PubKey> {
1942 c.snapshot(e)
1943 .expect("snapshot")
1944 .da_stake_table()
1945 .map(|p| PubKey::public_key(&p.stake_table_entry))
1946 .collect()
1947 }
1948
1949 let committees = build_committees(4);
1950 committees.set_first_epoch(EpochNumber::new(1), [0u8; 32]);
1951
1952 {
1956 let mut inner = committees.inner.write();
1957 let template = inner
1958 .epoch_committee(EpochNumber::genesis())
1959 .expect("genesis committee exists")
1960 .clone();
1961 for e in 3..6 {
1962 inner.put_epoch_committee(EpochNumber::new(e), template.clone());
1963 }
1964 }
1965
1966 let da_b: Vec<PeerConfig<SeqTypes>> = (0..2)
1967 .map(|i| {
1968 ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1969 [200u8; 32],
1970 i,
1971 U256::from(50),
1972 true,
1973 )
1974 .public_config()
1975 })
1976 .collect();
1977 let da_b_keys: Vec<PubKey> = da_b
1978 .iter()
1979 .map(|p| PubKey::public_key(&p.stake_table_entry))
1980 .collect();
1981
1982 let da_c: Vec<PeerConfig<SeqTypes>> = (0..2)
1983 .map(|i| {
1984 ValidatorConfig::<SeqTypes>::generated_from_seed_indexed(
1985 [201u8; 32],
1986 i,
1987 U256::from(50),
1988 true,
1989 )
1990 .public_config()
1991 })
1992 .collect();
1993 let da_c_keys: Vec<PubKey> = da_c
1994 .iter()
1995 .map(|p| PubKey::public_key(&p.stake_table_entry))
1996 .collect();
1997
1998 committees.add_da_committee(EpochNumber::new(5), da_c);
2001 committees.add_da_committee(EpochNumber::new(3), da_b);
2002
2003 assert_eq!(da_keys(&committees, EpochNumber::new(3)), da_b_keys);
2004 assert_eq!(da_keys(&committees, EpochNumber::new(4)), da_b_keys);
2005 assert_eq!(
2006 da_keys(&committees, EpochNumber::new(5)),
2007 da_c_keys,
2008 "epoch 5 must keep C — it is outside B's range",
2009 );
2010 }
2011}