1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use committable::Committable;
4use hotshot::types::SignatureKey;
5use hotshot_types::{
6 data::{EpochNumber, ViewNumber},
7 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
8 message::UpgradeLock,
9 simple_certificate::{LightClientStateUpdateCertificateV2, QuorumCertificate2},
10 simple_vote::{HasEpoch, QuorumVote2, VersionedVoteData},
11 stake_table::StakeTableEntries,
12 traits::node_implementation::NodeType,
13 vote::{
14 Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
15 },
16};
17use tokio::{
18 sync::mpsc::{self},
19 task::{AbortHandle, JoinSet},
20};
21use tracing::{debug, instrument, warn};
22
23use crate::message::Vote1;
24
25pub struct EpochRootVoteCollector<T: NodeType> {
33 per_view: BTreeMap<ViewNumber, (mpsc::Sender<Vote1<T>>, AbortHandle)>,
34 completed: BTreeSet<ViewNumber>,
35 epoch_membership_coordinator: EpochMembershipCoordinator<T>,
36 membership_cache: BTreeMap<EpochNumber, EpochMembership<T>>,
37 upgrade_lock: UpgradeLock<T>,
38 tasks: JoinSet<(
39 QuorumCertificate2<T>,
40 LightClientStateUpdateCertificateV2<T>,
41 )>,
42}
43
44impl<T: NodeType> EpochRootVoteCollector<T> {
45 #[instrument(level = "debug", skip_all)]
46 pub fn new(
47 epoch_membership_coordinator: EpochMembershipCoordinator<T>,
48 upgrade_lock: UpgradeLock<T>,
49 ) -> Self {
50 Self {
51 per_view: BTreeMap::new(),
52 completed: BTreeSet::new(),
53 epoch_membership_coordinator,
54 membership_cache: BTreeMap::new(),
55 upgrade_lock,
56 tasks: JoinSet::new(),
57 }
58 }
59
60 pub async fn next(
61 &mut self,
62 ) -> Option<(
63 QuorumCertificate2<T>,
64 LightClientStateUpdateCertificateV2<T>,
65 )> {
66 loop {
67 match self.tasks.join_next().await {
68 Some(Ok((cert1, state_cert))) => {
69 let view = cert1.view_number();
70 if self.completed.contains(&view) {
71 continue;
72 }
73 self.completed.insert(view);
74 return Some((cert1, state_cert));
75 },
76 Some(Err(e)) if e.is_cancelled() => {
77 debug!("Epoch-root vote collection task cancelled: {e}");
78 },
79 Some(Err(e)) => {
80 warn!("Error in epoch-root vote collection task: {e}");
81 },
82 None => return None,
83 }
84 }
85 }
86
87 pub async fn accumulate(&mut self, vote1: Vote1<T>) {
90 debug_assert!(
91 vote1.state_vote.is_some(),
92 "EpochRootVoteCollector::accumulate called with a Vote1 missing state_vote"
93 );
94
95 let view = vote1.vote.view_number();
96 if self.completed.contains(&view) {
97 return;
98 }
99 let Some(membership) = self.resolve_membership(&vote1.vote).await else {
100 return;
101 };
102 let (tx, _abort_handle) = self.per_view.entry(view).or_insert_with(|| {
103 let (tx, rx) = mpsc::channel(100);
104 let abort_handle = self.tasks.spawn(Self::run_per_view(
105 rx,
106 membership,
107 self.upgrade_lock.clone(),
108 ));
109 (tx, abort_handle)
110 });
111 let _ = tx.send(vote1).await;
112 }
113
114 async fn resolve_membership(&mut self, vote: &QuorumVote2<T>) -> Option<EpochMembership<T>> {
115 let epoch = vote.epoch()?;
116 if let Some(m) = self.membership_cache.get(&epoch) {
117 return Some(m.clone());
118 }
119 let m = self
120 .epoch_membership_coordinator
121 .membership_for_epoch(Some(epoch))
122 .ok()?;
123 self.membership_cache.insert(epoch, m.clone());
124 Some(m)
125 }
126
127 #[instrument(level = "debug", skip_all)]
128 async fn run_per_view(
129 mut rx: mpsc::Receiver<Vote1<T>>,
130 membership: EpochMembership<T>,
131 lock: UpgradeLock<T>,
132 ) -> (
133 QuorumCertificate2<T>,
134 LightClientStateUpdateCertificateV2<T>,
135 ) {
136 let mut quorum_accumulator =
137 VoteAccumulator::<T, QuorumVote2<T>, QuorumCertificate2<T>>::new(lock.clone());
138 let mut state_accumulator = LightClientStateUpdateVoteAccumulator::<T> {
139 vote_outcomes: HashMap::new(),
140 upgrade_lock: lock.clone(),
141 };
142
143 let mut quorum_cert: Option<QuorumCertificate2<T>> = None;
144 let mut state_cert: Option<LightClientStateUpdateCertificateV2<T>> = None;
145 let mut quorum_votes: Vec<QuorumVote2<T>> = Vec::new();
146
147 while let Some(vote1) = rx.recv().await {
148 let state_vote = match vote1.state_vote.clone() {
149 Some(sv) => sv,
150 None => {
151 tracing::error!(
152 "EpochRootVoteCollector::run_per_view called with a Vote1 missing \
153 state_vote"
154 );
155 continue;
156 },
157 };
158 let bls_key = vote1.vote.signing_key();
159
160 if quorum_cert.is_none() {
161 match quorum_accumulator.accumulate(&vote1.vote, membership.clone()) {
162 Some(cert) => {
163 let stake_table =
164 <QuorumCertificate2<T> as Certificate<T, _>>::stake_table(&membership);
165 let threshold =
166 <QuorumCertificate2<T> as Certificate<T, _>>::threshold(&membership);
167 match cert.is_valid_cert(
168 &StakeTableEntries::<T>::from(stake_table).0,
169 threshold,
170 &lock,
171 ) {
172 Ok(()) => {
173 quorum_cert = Some(cert);
174 },
175 Err(e) => {
176 warn!("Invalid quorum certificate formed at epoch-root view: {e}");
177 quorum_votes.push(vote1.vote.clone());
179 quorum_votes.retain(|v| {
180 let vote_commitment = generate_vote_commitment(v, &lock);
181 vote_commitment.is_some_and(|commitment| {
182 v.signing_key()
183 .validate(&v.signature(), commitment.as_ref())
184 })
185 });
186 quorum_accumulator = VoteAccumulator::new(lock.clone());
187 for v in &quorum_votes {
188 if let Some(cert) =
189 quorum_accumulator.accumulate(v, membership.clone())
190 {
191 quorum_cert = Some(cert);
192 break;
193 }
194 }
195 },
196 }
197 },
198 None => {
199 quorum_votes.push(vote1.vote.clone());
200 },
201 }
202 }
203
204 if state_cert.is_none()
207 && let Some(cert) = state_accumulator.accumulate(&bls_key, &state_vote, &membership)
208 {
209 state_cert = Some(cert);
210 }
211
212 if let (Some(q), Some(s)) = (&quorum_cert, &state_cert) {
213 return (q.clone(), s.clone());
214 }
215 }
216 futures::future::pending::<()>().await;
219 unreachable!()
220 }
221
222 pub fn gc(&mut self, view: ViewNumber, epoch: EpochNumber) {
223 let keep = self.per_view.split_off(&view);
224 self.completed = self.completed.split_off(&view);
225 for (_, handle) in self.per_view.values_mut() {
226 handle.abort();
227 }
228 self.per_view = keep;
229 self.membership_cache = self.membership_cache.split_off(&epoch);
230 }
231}
232
233fn generate_vote_commitment<T: NodeType, V: Vote<T>>(
234 vote: &V,
235 upgrade_lock: &UpgradeLock<T>,
236) -> Option<committable::Commitment<VersionedVoteData<T, V::Commitment>>> {
237 match VersionedVoteData::new(vote.date().clone(), vote.view_number(), upgrade_lock) {
238 Ok(data) => Some(data.commit()),
239 Err(e) => {
240 tracing::warn!("Failed to generate versioned vote data: {e}");
241 None
242 },
243 }
244}