Skip to main content

hotshot_new_protocol/
epoch.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    mem::swap,
4};
5
6use hotshot_types::{
7    data::{BlockNumber, EpochNumber, Leaf2},
8    drb::DrbResult,
9    epoch_membership::EpochMembershipCoordinator,
10    traits::{block_contents::BlockHeader, node_implementation::NodeType},
11    utils::{is_epoch_root, is_transition_block},
12};
13use hotshot_utils::anytrace;
14use tokio::task::{AbortHandle, JoinSet};
15use tracing::error;
16
17pub enum EpochRootResult {
18    DrbResult(EpochNumber, DrbResult),
19}
20
21/// Epoch + error for the Err path so retries can re-kick the task.
22pub struct EpochFailure {
23    pub epoch: EpochNumber,
24    pub error: EpochManagerError,
25}
26
27/// Output of a spawned DRB task.  The leading `EpochNumber` tags the task
28/// with the epoch it was working on so [`EpochManager::next`] can update
29/// dedup state correctly even on the Err path.
30type TaskOutput = (EpochNumber, Result<EpochRootResult, EpochManagerError>);
31
32/// Manager for Epoch specific rules and actions.
33///
34/// Delegates all catchup (stake-table walk-back, epoch root fetch, DRB
35/// compute) to [`EpochMembershipCoordinator::membership_for_epoch`].  This
36/// manager exists only to issue the request and dedup concurrent callers.
37pub struct EpochManager<T: NodeType> {
38    epoch_height: BlockNumber,
39    membership_coordinator: EpochMembershipCoordinator<T>,
40    tasks: JoinSet<TaskOutput>,
41    handles: BTreeMap<EpochNumber, Vec<AbortHandle>>,
42    /// Epochs for which a `request_drb_result` task is currently in flight.
43    /// Prevents duplicate fetch/compute tasks while the first is running.
44    pending_drb_requests: BTreeSet<EpochNumber>,
45    /// Epochs whose DRB has already been computed and added to membership.
46    /// Subsequent `request_drb_result` calls for these epochs are no-ops.
47    completed_drb_requests: BTreeSet<EpochNumber>,
48}
49
50impl<T: NodeType> EpochManager<T> {
51    pub fn new<B>(epoch_height: B, membership_coordinator: EpochMembershipCoordinator<T>) -> Self
52    where
53        B: Into<BlockNumber>,
54    {
55        Self {
56            epoch_height: epoch_height.into(),
57            membership_coordinator,
58            tasks: JoinSet::new(),
59            handles: BTreeMap::new(),
60            pending_drb_requests: BTreeSet::new(),
61            completed_drb_requests: BTreeSet::new(),
62        }
63    }
64
65    pub async fn next(&mut self) -> Option<Result<EpochRootResult, EpochFailure>> {
66        loop {
67            match self.tasks.join_next().await {
68                Some(Ok((epoch, result))) => {
69                    match result {
70                        Ok(root @ EpochRootResult::DrbResult(..)) => {
71                            self.pending_drb_requests.remove(&epoch);
72                            self.completed_drb_requests.insert(epoch);
73                            return Some(Ok(root));
74                        },
75                        Err(error) => {
76                            // Clear the guard so a subsequent call can retry.
77                            self.pending_drb_requests.remove(&epoch);
78                            return Some(Err(EpochFailure { epoch, error }));
79                        },
80                    }
81                },
82                Some(Err(err)) => {
83                    if !err.is_cancelled() {
84                        error!(%err, "epoch manager task panic")
85                    }
86                },
87                None => return None,
88            }
89        }
90    }
91
92    pub fn handle_leaf_decided(&mut self, leaf: Leaf2<T>) {
93        let block_number = leaf.block_header().block_number();
94
95        // At every epoch root, trigger DRB computation for the epoch that
96        // will use this root (epoch + 2).
97        if is_epoch_root(block_number, *self.epoch_height) {
98            let Some(epoch) = leaf.epoch(*self.epoch_height) else {
99                error!("Leaf has no epoch");
100                return;
101            };
102            self.request_drb_result(epoch + 2);
103        }
104
105        // If this is the transition block of an epoch feed the DRB result to the coordinator.
106        if is_transition_block(block_number, *self.epoch_height)
107            && let Some(epoch) = leaf.epoch(*self.epoch_height)
108            && let Some(drb) = leaf.next_drb_result
109        {
110            let target_epoch = epoch + 1;
111            if !self.completed_drb_requests.contains(&target_epoch) {
112                self.membership_coordinator.supply_drb(target_epoch, drb);
113                self.completed_drb_requests.insert(target_epoch);
114                self.pending_drb_requests.remove(&target_epoch);
115            }
116        }
117    }
118
119    pub fn gc(&mut self, epoch: EpochNumber) {
120        let mut tmp = self.handles.split_off(&epoch);
121        swap(&mut tmp, &mut self.handles);
122        for handle in tmp.into_values().flatten() {
123            handle.abort();
124        }
125        // Drop tracking entries for epochs we no longer care about.  Keeps
126        // `completed_drb_requests` bounded while the protocol runs.
127        self.pending_drb_requests = self.pending_drb_requests.split_off(&epoch);
128        self.completed_drb_requests = self.completed_drb_requests.split_off(&epoch);
129    }
130
131    pub fn request_drb_result(&mut self, epoch: EpochNumber) {
132        // Already computed — caller can read the DRB from membership.
133        if self.completed_drb_requests.contains(&epoch) {
134            return;
135        }
136        // In-flight task will deliver the result; avoid spawning a duplicate.
137        if self.pending_drb_requests.contains(&epoch) {
138            return;
139        }
140        self.pending_drb_requests.insert(epoch);
141        let membership_coordinator = self.membership_coordinator.clone();
142        let handles = self.handles.entry(epoch).or_default();
143
144        handles.push(self.tasks.spawn(async move {
145            let result = async {
146                // Kick the membership coordinator.  If the stake table is
147                // already ready, this returns it immediately; otherwise it
148                // spawns a catchup task and returns a "catchup in progress"
149                // error.  Either way, `wait_for_catchup` resolves once the
150                // stake table + DRB are both in place.
151                let membership = match membership_coordinator.membership_for_epoch(Some(epoch)) {
152                    Ok(m) => m,
153                    Err(_) => membership_coordinator
154                        .wait_for_catchup(epoch)
155                        .await
156                        .map_err(EpochManagerError::DrbLookup)?,
157                };
158                let drb = membership
159                    .get_epoch_drb()
160                    .await
161                    .map_err(EpochManagerError::DrbLookup)?;
162                Ok(EpochRootResult::DrbResult(epoch, drb))
163            }
164            .await;
165            (epoch, result)
166        }));
167    }
168}
169
170#[derive(Debug, thiserror::Error)]
171pub enum EpochManagerError {
172    #[error("failed to add epoch root: {0}")]
173    EpochRoot(#[source] anyhow::Error),
174
175    #[error("failed to compute drb: {0}")]
176    DrbCompute(#[source] anytrace::Error),
177
178    #[error("failed to get drb: {0}")]
179    DrbLookup(#[source] anytrace::Error),
180}