Skip to main content

hotshot_new_protocol/
epoch_root_vote_collector.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use committable::Committable;
4use hotshot::types::SignatureKey;
5use hotshot_types::{
6    data::{EpochNumber, ViewNumber},
7    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
8    message::UpgradeLock,
9    simple_certificate::{LightClientStateUpdateCertificateV2, QuorumCertificate2},
10    simple_vote::{HasEpoch, QuorumVote2, VersionedVoteData},
11    stake_table::StakeTableEntries,
12    traits::node_implementation::NodeType,
13    vote::{
14        Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
15    },
16};
17use tokio::{
18    sync::mpsc::{self},
19    task::{AbortHandle, JoinSet},
20};
21use tracing::{debug, instrument, warn};
22
23use crate::message::Vote1;
24
25/// Combined collector for epoch-root views. Runs both a quorum-vote accumulator
26/// (producing a `QuorumCertificate2`) and a light-client-state-update-vote
27/// accumulator (producing a `LightClientStateUpdateCertificateV2`) against the
28/// same `Vote1` stream, emitting the pair only when **both** cross threshold.
29///
30/// This preserves the old protocol's atomicity property: `Consensus` never sees
31/// an epoch-root Cert1 without the matching `state_cert`.
32pub struct EpochRootVoteCollector<T: NodeType> {
33    per_view: BTreeMap<ViewNumber, (mpsc::Sender<Vote1<T>>, AbortHandle)>,
34    completed: BTreeSet<ViewNumber>,
35    epoch_membership_coordinator: EpochMembershipCoordinator<T>,
36    membership_cache: BTreeMap<EpochNumber, EpochMembership<T>>,
37    upgrade_lock: UpgradeLock<T>,
38    tasks: JoinSet<(
39        QuorumCertificate2<T>,
40        LightClientStateUpdateCertificateV2<T>,
41    )>,
42}
43
44impl<T: NodeType> EpochRootVoteCollector<T> {
45    #[instrument(level = "debug", skip_all)]
46    pub fn new(
47        epoch_membership_coordinator: EpochMembershipCoordinator<T>,
48        upgrade_lock: UpgradeLock<T>,
49    ) -> Self {
50        Self {
51            per_view: BTreeMap::new(),
52            completed: BTreeSet::new(),
53            epoch_membership_coordinator,
54            membership_cache: BTreeMap::new(),
55            upgrade_lock,
56            tasks: JoinSet::new(),
57        }
58    }
59
60    pub async fn next(
61        &mut self,
62    ) -> Option<(
63        QuorumCertificate2<T>,
64        LightClientStateUpdateCertificateV2<T>,
65    )> {
66        loop {
67            match self.tasks.join_next().await {
68                Some(Ok((cert1, state_cert))) => {
69                    let view = cert1.view_number();
70                    if self.completed.contains(&view) {
71                        continue;
72                    }
73                    self.completed.insert(view);
74                    return Some((cert1, state_cert));
75                },
76                Some(Err(e)) if e.is_cancelled() => {
77                    debug!("Epoch-root vote collection task cancelled: {e}");
78                },
79                Some(Err(e)) => {
80                    warn!("Error in epoch-root vote collection task: {e}");
81                },
82                None => return None,
83            }
84        }
85    }
86
87    /// Accumulate a `Vote1` for an epoch-root view. Caller should have verified
88    /// `vote1.state_vote.is_some()`.
89    pub async fn accumulate(&mut self, vote1: Vote1<T>) {
90        debug_assert!(
91            vote1.state_vote.is_some(),
92            "EpochRootVoteCollector::accumulate called with a Vote1 missing state_vote"
93        );
94
95        let view = vote1.vote.view_number();
96        if self.completed.contains(&view) {
97            return;
98        }
99        let Some(membership) = self.resolve_membership(&vote1.vote).await else {
100            return;
101        };
102        let (tx, _abort_handle) = self.per_view.entry(view).or_insert_with(|| {
103            let (tx, rx) = mpsc::channel(100);
104            let abort_handle = self.tasks.spawn(Self::run_per_view(
105                rx,
106                membership,
107                self.upgrade_lock.clone(),
108            ));
109            (tx, abort_handle)
110        });
111        let _ = tx.send(vote1).await;
112    }
113
114    async fn resolve_membership(&mut self, vote: &QuorumVote2<T>) -> Option<EpochMembership<T>> {
115        let epoch = vote.epoch()?;
116        if let Some(m) = self.membership_cache.get(&epoch) {
117            return Some(m.clone());
118        }
119        let m = self
120            .epoch_membership_coordinator
121            .membership_for_epoch(Some(epoch))
122            .ok()?;
123        self.membership_cache.insert(epoch, m.clone());
124        Some(m)
125    }
126
127    #[instrument(level = "debug", skip_all)]
128    async fn run_per_view(
129        mut rx: mpsc::Receiver<Vote1<T>>,
130        membership: EpochMembership<T>,
131        lock: UpgradeLock<T>,
132    ) -> (
133        QuorumCertificate2<T>,
134        LightClientStateUpdateCertificateV2<T>,
135    ) {
136        let mut quorum_accumulator =
137            VoteAccumulator::<T, QuorumVote2<T>, QuorumCertificate2<T>>::new(lock.clone());
138        let mut state_accumulator = LightClientStateUpdateVoteAccumulator::<T> {
139            vote_outcomes: HashMap::new(),
140            upgrade_lock: lock.clone(),
141        };
142
143        let mut quorum_cert: Option<QuorumCertificate2<T>> = None;
144        let mut state_cert: Option<LightClientStateUpdateCertificateV2<T>> = None;
145        let mut quorum_votes: Vec<QuorumVote2<T>> = Vec::new();
146
147        while let Some(vote1) = rx.recv().await {
148            let state_vote = match vote1.state_vote.clone() {
149                Some(sv) => sv,
150                None => {
151                    tracing::error!(
152                        "EpochRootVoteCollector::run_per_view called with a Vote1 missing \
153                         state_vote"
154                    );
155                    continue;
156                },
157            };
158            let bls_key = vote1.vote.signing_key();
159
160            if quorum_cert.is_none() {
161                match quorum_accumulator.accumulate(&vote1.vote, membership.clone()) {
162                    Some(cert) => {
163                        let stake_table =
164                            <QuorumCertificate2<T> as Certificate<T, _>>::stake_table(&membership);
165                        let threshold =
166                            <QuorumCertificate2<T> as Certificate<T, _>>::threshold(&membership);
167                        match cert.is_valid_cert(
168                            &StakeTableEntries::<T>::from(stake_table).0,
169                            threshold,
170                            &lock,
171                        ) {
172                            Ok(()) => {
173                                quorum_cert = Some(cert);
174                            },
175                            Err(e) => {
176                                warn!("Invalid quorum certificate formed at epoch-root view: {e}");
177                                // Retry from previously-seen votes (mirror VoteCollector recovery).
178                                quorum_votes.push(vote1.vote.clone());
179                                quorum_votes.retain(|v| {
180                                    let vote_commitment = generate_vote_commitment(v, &lock);
181                                    vote_commitment.is_some_and(|commitment| {
182                                        v.signing_key()
183                                            .validate(&v.signature(), commitment.as_ref())
184                                    })
185                                });
186                                quorum_accumulator = VoteAccumulator::new(lock.clone());
187                                for v in &quorum_votes {
188                                    if let Some(cert) =
189                                        quorum_accumulator.accumulate(v, membership.clone())
190                                    {
191                                        quorum_cert = Some(cert);
192                                        break;
193                                    }
194                                }
195                            },
196                        }
197                    },
198                    None => {
199                        quorum_votes.push(vote1.vote.clone());
200                    },
201                }
202            }
203
204            // Unlike regular votes, we don't have to double check the certificate because votes are
205            // fully checked, including signature in the state_accumulator.
206            if state_cert.is_none()
207                && let Some(cert) = state_accumulator.accumulate(&bls_key, &state_vote, &membership)
208            {
209                state_cert = Some(cert);
210            }
211
212            if let (Some(q), Some(s)) = (&quorum_cert, &state_cert) {
213                return (q.clone(), s.clone());
214            }
215        }
216        // Channel closed without both certs forming; this task is effectively dead.
217        // Await never returns — GC aborts via AbortHandle.
218        futures::future::pending::<()>().await;
219        unreachable!()
220    }
221
222    pub fn gc(&mut self, view: ViewNumber, epoch: EpochNumber) {
223        let keep = self.per_view.split_off(&view);
224        self.completed = self.completed.split_off(&view);
225        for (_, handle) in self.per_view.values_mut() {
226            handle.abort();
227        }
228        self.per_view = keep;
229        self.membership_cache = self.membership_cache.split_off(&epoch);
230    }
231}
232
233fn generate_vote_commitment<T: NodeType, V: Vote<T>>(
234    vote: &V,
235    upgrade_lock: &UpgradeLock<T>,
236) -> Option<committable::Commitment<VersionedVoteData<T, V::Commitment>>> {
237    match VersionedVoteData::new(vote.date().clone(), vote.view_number(), upgrade_lock) {
238        Ok(data) => Some(data.commit()),
239        Err(e) => {
240            tracing::warn!("Failed to generate versioned vote data: {e}");
241            None
242        },
243    }
244}