hotshot_task_impls/quorum_proposal_recv/
mod.rs1#![allow(unused_imports)]
8
9use std::{collections::BTreeMap, sync::Arc};
10
11use async_broadcast::{Receiver, Sender, broadcast};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use either::Either;
15use futures::future::{err, join_all};
16use hotshot_task::task::{Task, TaskState};
17use hotshot_types::{
18 consensus::{Consensus, OuterConsensus},
19 data::{EpochNumber, Leaf, ViewChangeEvidence2, ViewNumber},
20 epoch_membership::{self, EpochMembership, EpochMembershipCoordinator},
21 event::Event,
22 message::UpgradeLock,
23 simple_certificate::UpgradeCertificate,
24 simple_vote::HasEpoch,
25 traits::{
26 block_contents::BlockHeader,
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 },
30 utils::option_epoch_from_block_number,
31 vote::{Certificate, HasViewNumber},
32};
33use hotshot_utils::anytrace::{Result, bail};
34use tokio::task::JoinHandle;
35use tracing::{debug, error, info, instrument, warn};
36use vbs::version::Version;
37
38use self::handlers::handle_quorum_proposal_recv;
39use crate::{
40 events::{HotShotEvent, ProposalMissing},
41 helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
42};
43mod handlers;
45
46pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49 pub public_key: TYPES::SignatureKey,
51
52 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55 pub consensus: OuterConsensus<TYPES>,
57
58 pub cur_view: ViewNumber,
60
61 pub cur_epoch: Option<EpochNumber>,
63
64 pub membership: EpochMembershipCoordinator<TYPES>,
66
67 pub timeout: u64,
69
70 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
72
73 pub storage: I::Storage,
75
76 pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
79
80 pub id: u64,
82
83 pub upgrade_lock: UpgradeLock<TYPES>,
85
86 pub epoch_height: u64,
88
89 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
91}
92
93pub(crate) struct ValidationInfo<TYPES: NodeType, I: NodeImplementation<TYPES>> {
96 pub id: u64,
98
99 pub(crate) public_key: TYPES::SignatureKey,
101
102 pub(crate) private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
104
105 pub(crate) consensus: OuterConsensus<TYPES>,
107
108 pub membership: EpochMembership<TYPES>,
110
111 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
113
114 pub(crate) storage: I::Storage,
116
117 pub(crate) upgrade_lock: UpgradeLock<TYPES>,
119
120 pub epoch_height: u64,
122
123 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
125}
126
127impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalRecvTaskState<TYPES, I> {
128 pub fn cancel_tasks(&mut self, view: ViewNumber) {
130 let keep = self.spawned_tasks.split_off(&view);
131 while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
132 for task in tasks {
133 task.abort();
134 }
135 }
136 self.spawned_tasks = keep;
137 }
138
139 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error")]
141 #[allow(unused_variables)]
142 pub async fn handle(
143 &mut self,
144 event: Arc<HotShotEvent<TYPES>>,
145 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
146 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
147 ) {
148 match event.as_ref() {
149 HotShotEvent::QuorumProposalRecv(proposal, sender) => {
150 tracing::debug!(
151 "Quorum proposal recv for view {}",
152 proposal.data.view_number()
153 );
154 if self.consensus.read().await.cur_view() > proposal.data.view_number() + 1
155 || self.cur_view > proposal.data.view_number() + 1
156 {
157 tracing::warn!(
158 "Throwing away old proposal for view {}",
159 proposal.data.view_number()
160 );
161 return;
162 }
163 let proposal_epoch = option_epoch_from_block_number(
164 proposal.data.proposal.epoch().is_some(),
165 proposal.data.block_header().block_number(),
166 self.epoch_height,
167 );
168 let Ok(epoch_membership) = self.membership.membership_for_epoch(proposal_epoch)
169 else {
170 tracing::warn!("No Stake table for epoch = {proposal_epoch:?}");
171 return;
172 };
173 let validation_info = ValidationInfo::<TYPES, I> {
174 id: self.id,
175 public_key: self.public_key.clone(),
176 private_key: self.private_key.clone(),
177 consensus: self.consensus.clone(),
178 membership: epoch_membership,
179 output_event_stream: self.output_event_stream.clone(),
180 storage: self.storage.clone(),
181 upgrade_lock: self.upgrade_lock.clone(),
182 epoch_height: self.epoch_height,
183 first_epoch: self.first_epoch,
184 };
185 match handle_quorum_proposal_recv(
186 proposal,
187 sender,
188 &event_sender,
189 &event_receiver,
190 validation_info,
191 )
192 .await
193 {
194 Ok(()) => {},
195 Err(e) => tracing::error!(?e, "Failed to validate the proposal"),
196 }
197 },
198 HotShotEvent::ViewChange(view, epoch) => {
199 if *epoch > self.cur_epoch {
200 self.cur_epoch = *epoch;
201 }
202 if self.cur_view >= *view {
203 return;
204 }
205 self.cur_view = *view;
206 let oldest_view_to_keep = ViewNumber::new(view.saturating_sub(1));
211 self.cancel_tasks(oldest_view_to_keep);
212 },
213 HotShotEvent::SetFirstEpoch(view, epoch) => {
214 self.first_epoch = Some((*view, *epoch));
215 },
216 _ => {},
217 }
218 }
219}
220
221#[async_trait]
222impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState
223 for QuorumProposalRecvTaskState<TYPES, I>
224{
225 type Event = HotShotEvent<TYPES>;
226
227 async fn handle_event(
228 &mut self,
229 event: Arc<Self::Event>,
230 sender: &Sender<Arc<Self::Event>>,
231 receiver: &Receiver<Arc<Self::Event>>,
232 ) -> Result<()> {
233 if self.upgrade_lock.new_protocol_active(self.cur_view) {
234 return Ok(());
235 }
236 self.handle(event, sender.clone(), receiver.clone()).await;
237
238 Ok(())
239 }
240
241 fn cancel_subtasks(&mut self) {
242 while !self.spawned_tasks.is_empty() {
243 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
244 break;
245 };
246 for handle in handles {
247 handle.abort();
248 }
249 }
250 }
251}