hotshot_new_protocol/
epoch.rs1use std::{
2 collections::{BTreeMap, BTreeSet},
3 mem::swap,
4};
5
6use hotshot_types::{
7 data::{BlockNumber, EpochNumber, Leaf2},
8 drb::DrbResult,
9 epoch_membership::EpochMembershipCoordinator,
10 traits::{block_contents::BlockHeader, node_implementation::NodeType},
11 utils::{is_epoch_root, is_transition_block},
12};
13use hotshot_utils::anytrace;
14use tokio::task::{AbortHandle, JoinSet};
15use tracing::error;
16
17pub enum EpochRootResult {
18 DrbResult(EpochNumber, DrbResult),
19}
20
21pub struct EpochFailure {
23 pub epoch: EpochNumber,
24 pub error: EpochManagerError,
25}
26
27type TaskOutput = (EpochNumber, Result<EpochRootResult, EpochManagerError>);
31
32pub struct EpochManager<T: NodeType> {
38 epoch_height: BlockNumber,
39 membership_coordinator: EpochMembershipCoordinator<T>,
40 tasks: JoinSet<TaskOutput>,
41 handles: BTreeMap<EpochNumber, Vec<AbortHandle>>,
42 pending_drb_requests: BTreeSet<EpochNumber>,
45 completed_drb_requests: BTreeSet<EpochNumber>,
48}
49
50impl<T: NodeType> EpochManager<T> {
51 pub fn new<B>(epoch_height: B, membership_coordinator: EpochMembershipCoordinator<T>) -> Self
52 where
53 B: Into<BlockNumber>,
54 {
55 Self {
56 epoch_height: epoch_height.into(),
57 membership_coordinator,
58 tasks: JoinSet::new(),
59 handles: BTreeMap::new(),
60 pending_drb_requests: BTreeSet::new(),
61 completed_drb_requests: BTreeSet::new(),
62 }
63 }
64
65 pub async fn next(&mut self) -> Option<Result<EpochRootResult, EpochFailure>> {
66 loop {
67 match self.tasks.join_next().await {
68 Some(Ok((epoch, result))) => {
69 match result {
70 Ok(root @ EpochRootResult::DrbResult(..)) => {
71 self.pending_drb_requests.remove(&epoch);
72 self.completed_drb_requests.insert(epoch);
73 return Some(Ok(root));
74 },
75 Err(error) => {
76 self.pending_drb_requests.remove(&epoch);
78 return Some(Err(EpochFailure { epoch, error }));
79 },
80 }
81 },
82 Some(Err(err)) => {
83 if !err.is_cancelled() {
84 error!(%err, "epoch manager task panic")
85 }
86 },
87 None => return None,
88 }
89 }
90 }
91
92 pub fn handle_leaf_decided(&mut self, leaf: Leaf2<T>) {
93 let block_number = leaf.block_header().block_number();
94
95 if is_epoch_root(block_number, *self.epoch_height) {
98 let Some(epoch) = leaf.epoch(*self.epoch_height) else {
99 error!("Leaf has no epoch");
100 return;
101 };
102 self.request_drb_result(epoch + 2);
103 }
104
105 if is_transition_block(block_number, *self.epoch_height)
107 && let Some(epoch) = leaf.epoch(*self.epoch_height)
108 && let Some(drb) = leaf.next_drb_result
109 {
110 let target_epoch = epoch + 1;
111 if !self.completed_drb_requests.contains(&target_epoch) {
112 self.membership_coordinator.supply_drb(target_epoch, drb);
113 self.completed_drb_requests.insert(target_epoch);
114 self.pending_drb_requests.remove(&target_epoch);
115 }
116 }
117 }
118
119 pub fn gc(&mut self, epoch: EpochNumber) {
120 let mut tmp = self.handles.split_off(&epoch);
121 swap(&mut tmp, &mut self.handles);
122 for handle in tmp.into_values().flatten() {
123 handle.abort();
124 }
125 self.pending_drb_requests = self.pending_drb_requests.split_off(&epoch);
128 self.completed_drb_requests = self.completed_drb_requests.split_off(&epoch);
129 }
130
131 pub fn request_drb_result(&mut self, epoch: EpochNumber) {
132 if self.completed_drb_requests.contains(&epoch) {
134 return;
135 }
136 if self.pending_drb_requests.contains(&epoch) {
138 return;
139 }
140 self.pending_drb_requests.insert(epoch);
141 let membership_coordinator = self.membership_coordinator.clone();
142 let handles = self.handles.entry(epoch).or_default();
143
144 handles.push(self.tasks.spawn(async move {
145 let result = async {
146 let membership = match membership_coordinator.membership_for_epoch(Some(epoch)) {
152 Ok(m) => m,
153 Err(_) => membership_coordinator
154 .wait_for_catchup(epoch)
155 .await
156 .map_err(EpochManagerError::DrbLookup)?,
157 };
158 let drb = membership
159 .get_epoch_drb()
160 .await
161 .map_err(EpochManagerError::DrbLookup)?;
162 Ok(EpochRootResult::DrbResult(epoch, drb))
163 }
164 .await;
165 (epoch, result)
166 }));
167 }
168}
169
170#[derive(Debug, thiserror::Error)]
171pub enum EpochManagerError {
172 #[error("failed to add epoch root: {0}")]
173 EpochRoot(#[source] anyhow::Error),
174
175 #[error("failed to compute drb: {0}")]
176 DrbCompute(#[source] anytrace::Error),
177
178 #[error("failed to get drb: {0}")]
179 DrbLookup(#[source] anytrace::Error),
180}