1use std::{collections::HashMap, sync::Arc};
2
3use async_broadcast::{InactiveReceiver, Sender};
4use async_lock::RwLock;
5use committable::Commitment;
6use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
7use hotshot::{traits::NodeImplementation, types::SystemContextHandle};
8use hotshot_new_protocol::{
9 client::ClientApi,
10 consensus::{ConsensusOutput, PreCutoverSeed},
11 coordinator::{Coordinator, CoordinatorOutput, error::Severity},
12 cutover::{
13 CutoverGate, extract_pre_cutover_seed, forward_legacy_epoch_changes,
14 forward_legacy_timeout_votes,
15 },
16 network::Network,
17 state::UpdateLeaf,
18 storage::NewProtocolStorage,
19};
20use hotshot_types::{
21 data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
22 epoch_membership::EpochMembershipCoordinator,
23 event::{Event, LeafInfo},
24 message::{Proposal as SignedProposal, UpgradeLock, convert_proposal},
25 new_protocol::CoordinatorEvent,
26 traits::{ValidatedState, node_implementation::NodeType, signature_key::SignatureKey},
27 utils::StateAndDelta,
28};
29use tokio::spawn;
30use tokio_util::task::AbortOnDropHandle;
31use versions::NEW_PROTOCOL_VERSION;
32
33fn consensus_event<T, N, S>(
37 coordinator: &Coordinator<T, N, S>,
38 output: &ConsensusOutput<T>,
39) -> Option<CoordinatorEvent<T>>
40where
41 T: NodeType,
42 N: Network<T>,
43 S: NewProtocolStorage<T>,
44{
45 match output {
46 ConsensusOutput::LeafDecided {
47 leaves,
48 cert1,
49 cert2,
50 vid_shares,
51 } => {
52 if leaves.is_empty() {
53 tracing::error!("coordinator emitted LeafDecided with empty leaves");
54 return None;
55 }
56 let leaf_infos = leaves
57 .iter()
58 .zip(vid_shares.iter())
59 .map(|(leaf, vid_share)| {
60 let (state, delta) = match coordinator.state(leaf.view_number()) {
61 Some(s) => (s.state.clone(), s.delta.clone()),
62 None => {
63 let s = Arc::new(T::ValidatedState::from_header(leaf.block_header()));
64 (s, None)
65 },
66 };
67 let vid_share = vid_share
68 .as_ref()
69 .map(|share| VidDisperseShare::V2(share.data.clone()));
70 LeafInfo::new(leaf.clone(), state, delta, vid_share, None)
71 })
72 .collect();
73 Some(CoordinatorEvent::NewDecide {
74 leaf_infos,
75 cert1: cert1.clone(),
76 cert2: cert2.clone(),
77 })
78 },
79 ConsensusOutput::ProposalValidated { proposal, sender } => {
80 Some(CoordinatorEvent::QuorumProposal {
81 proposal: proposal.clone(),
82 sender: sender.clone(),
83 })
84 },
85 ConsensusOutput::BlockPayloadReconstructed {
86 view,
87 header,
88 payload,
89 } => Some(CoordinatorEvent::BlockPayloadReconstructed {
90 view: *view,
91 header: header.clone(),
92 payload: payload.clone(),
93 }),
94 _ => None,
95 }
96}
97
98fn coordinator_event<T, N, S>(
99 coordinator: &Coordinator<T, N, S>,
100 output: &CoordinatorOutput<T>,
101) -> Option<CoordinatorEvent<T>>
102where
103 T: NodeType,
104 N: Network<T>,
105 S: NewProtocolStorage<T>,
106{
107 match output {
108 CoordinatorOutput::Consensus(inner) => consensus_event(coordinator, inner),
109 CoordinatorOutput::ExternalMessageReceived { sender, data } => {
110 Some(CoordinatorEvent::ExternalMessageReceived {
111 sender: sender.clone(),
112 data: data.clone(),
113 })
114 },
115 }
116}
117
118pub struct ConsensusHandle<T: NodeType, I: NodeImplementation<T>> {
119 legacy_handle: Arc<RwLock<SystemContextHandle<T, I>>>,
120 client_api: ClientApi<T>,
121 coordinator_task: AbortOnDropHandle<()>,
122 epoch_height: u64,
123 cutover_gate: CutoverGate,
124 legacy_event_rx: InactiveReceiver<Event<T>>,
125 event_rx: InactiveReceiver<CoordinatorEvent<T>>,
126}
127
128impl<T, I> ConsensusHandle<T, I>
129where
130 T: NodeType,
131 I: NodeImplementation<T>,
132{
133 pub fn new<N>(
134 legacy_handle: Arc<RwLock<SystemContextHandle<T, I>>>,
135 coordinator: Coordinator<T, N, I::Storage>,
136 epoch_height: u64,
137 legacy_event_rx: InactiveReceiver<Event<T>>,
138 event_channel_capacity: usize,
139 ) -> Self
140 where
141 N: Network<T> + Send + 'static,
142 I::Storage: NewProtocolStorage<T>,
143 {
144 let client_api = coordinator.client_api().clone();
145
146 let (mut event_tx, mut event_rx) =
147 async_broadcast::broadcast::<CoordinatorEvent<T>>(event_channel_capacity);
148 event_tx.set_await_active(false);
149 event_rx.set_overflow(true);
150
151 let coordinator_task =
152 AbortOnDropHandle::new(spawn(run_coordinator(coordinator, event_tx)));
153
154 spawn(forward_legacy_timeout_votes(
155 legacy_event_rx.clone(),
156 client_api.clone(),
157 ));
158 spawn(forward_legacy_epoch_changes(
159 legacy_event_rx.clone(),
160 client_api.clone(),
161 epoch_height,
162 ));
163
164 Self {
165 legacy_handle,
166 client_api,
167 coordinator_task,
168 epoch_height,
169 cutover_gate: CutoverGate::new(),
170 legacy_event_rx,
171 event_rx: event_rx.deactivate(),
172 }
173 }
174
175 pub async fn extract_pre_cutover_seed(&self) -> Option<PreCutoverSeed<T>> {
176 let legacy = self.legacy_handle.read().await;
177 extract_pre_cutover_seed(&legacy).await
178 }
179
180 pub fn legacy_consensus(&self) -> Arc<RwLock<SystemContextHandle<T, I>>> {
181 self.legacy_handle.clone()
182 }
183
184 pub fn client_api(&self) -> &ClientApi<T> {
185 &self.client_api
186 }
187
188 async fn at_or_past_cutover(&self, view: ViewNumber) -> bool {
193 self.legacy_handle
194 .read()
195 .await
196 .hotshot
197 .upgrade_lock
198 .version_infallible(view)
199 >= NEW_PROTOCOL_VERSION
200 }
201
202 pub async fn cutover_active(&self) -> bool {
207 if self.cutover_gate.is_active() {
208 return true;
209 }
210 let legacy = self.legacy_handle.read().await;
211 self.cutover_gate.check(&legacy, &self.client_api).await
212 }
213
214 pub fn event_stream(&self) -> BoxStream<'static, CoordinatorEvent<T>> {
215 let old_stream = self
216 .legacy_event_rx
217 .activate_cloned()
218 .map(CoordinatorEvent::LegacyEvent);
219
220 let new_stream = self.event_rx.activate_cloned();
221
222 futures::stream::select(old_stream, new_stream).boxed()
223 }
224
225 pub async fn current_view(&self) -> ViewNumber {
226 if self.cutover_active().await {
227 return self
228 .client_api
229 .current_view()
230 .await
231 .expect("coordinator channel closed");
232 }
233 self.legacy_handle.read().await.cur_view().await
234 }
235
236 pub async fn decided_leaf(&self) -> Leaf2<T> {
237 if self.cutover_active().await {
238 return self
239 .client_api
240 .decided_leaf()
241 .await
242 .expect("coordinator channel closed");
243 }
244 self.legacy_handle.read().await.decided_leaf().await
245 }
246
247 pub async fn decided_state(&self) -> Option<Arc<T::ValidatedState>> {
248 if self.cutover_active().await {
249 return self
250 .client_api
251 .decided_state()
252 .await
253 .expect("coordinator channel closed");
254 }
255 Some(self.legacy_handle.read().await.decided_state().await)
256 }
257
258 pub async fn state(&self, view: ViewNumber) -> Option<Arc<T::ValidatedState>> {
259 if self.at_or_past_cutover(view).await {
260 return self
261 .client_api
262 .state(view)
263 .await
264 .expect("coordinator channel closed");
265 }
266 self.legacy_handle.read().await.state(view).await
267 }
268
269 pub async fn state_and_delta(&self, view: ViewNumber) -> StateAndDelta<T> {
270 if self.at_or_past_cutover(view).await {
271 return self
272 .client_api
273 .state_and_delta(view)
274 .await
275 .expect("coordinator channel closed");
276 }
277 self.legacy_handle
278 .read()
279 .await
280 .hotshot
281 .consensus()
282 .read()
283 .await
284 .state_and_delta(view)
285 }
286
287 pub async fn undecided_leaves(&self) -> Vec<Leaf2<T>> {
288 if self.cutover_active().await {
289 return self
290 .client_api
291 .undecided_leaves()
292 .await
293 .expect("coordinator channel closed");
294 }
295 self.legacy_handle
296 .read()
297 .await
298 .hotshot
299 .consensus()
300 .read()
301 .await
302 .undecided_leaves()
303 }
304
305 pub async fn current_epoch(&self) -> Option<EpochNumber> {
306 if self.cutover_active().await {
307 return self
308 .client_api
309 .current_epoch()
310 .await
311 .expect("coordinator channel closed");
312 }
313 self.legacy_handle.read().await.cur_epoch().await
314 }
315
316 pub async fn epoch_height(&self) -> u64 {
317 if self.cutover_active().await {
318 return self.epoch_height;
319 }
320 self.legacy_handle.read().await.epoch_height
321 }
322
323 pub async fn membership_coordinator(&self) -> EpochMembershipCoordinator<T> {
325 self.legacy_handle
326 .read()
327 .await
328 .membership_coordinator
329 .clone()
330 }
331
332 pub async fn upgrade_lock(&self) -> UpgradeLock<T> {
334 self.legacy_handle.read().await.hotshot.upgrade_lock.clone()
335 }
336
337 pub async fn storage(&self) -> I::Storage {
339 self.legacy_handle.read().await.storage()
340 }
341
342 pub async fn current_proposal_participation(&self) -> HashMap<T::SignatureKey, f64> {
344 self.legacy_handle
345 .read()
346 .await
347 .consensus()
348 .read()
349 .await
350 .current_proposal_participation()
351 }
352
353 pub async fn proposal_participation(
354 &self,
355 epoch: EpochNumber,
356 ) -> HashMap<T::SignatureKey, f64> {
357 self.legacy_handle
358 .read()
359 .await
360 .consensus()
361 .read()
362 .await
363 .proposal_participation(epoch)
364 }
365
366 pub async fn current_vote_participation(
367 &self,
368 ) -> HashMap<<T::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
369 self.legacy_handle
370 .read()
371 .await
372 .consensus()
373 .read()
374 .await
375 .current_vote_participation()
376 }
377
378 pub async fn vote_participation(
379 &self,
380 epoch: Option<EpochNumber>,
381 ) -> HashMap<<T::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
382 self.legacy_handle
383 .read()
384 .await
385 .consensus()
386 .read()
387 .await
388 .vote_participation(epoch)
389 }
390
391 pub async fn request_proposal(
392 &self,
393 view: ViewNumber,
394 leaf_commitment: Commitment<Leaf2<T>>,
395 ) -> anyhow::Result<
396 BoxFuture<'static, anyhow::Result<SignedProposal<T, QuorumProposalWrapper<T>>>>,
397 > {
398 if self.at_or_past_cutover(view).await {
399 let client_api = self.client_api.clone();
400 return Ok(async move {
401 client_api
402 .request_proposal(view, leaf_commitment)
403 .await
404 .map(convert_proposal)
405 .map_err(|err| anyhow::anyhow!("{err}"))
406 }
407 .boxed());
408 }
409
410 let future = self
411 .legacy_handle
412 .read()
413 .await
414 .request_proposal(view, leaf_commitment)
415 .map_err(|e| anyhow::anyhow!("{e}"))?;
416 Ok(async move { future.await.map_err(|e| anyhow::anyhow!("{e}")) }.boxed())
417 }
418
419 pub async fn submit_transaction(&self, tx: T::Transaction) -> anyhow::Result<()> {
420 let view = self.current_view().await;
421 if self.at_or_past_cutover(view).await {
422 return self
423 .client_api
424 .submit_transaction(tx)
425 .await
426 .map_err(|e| anyhow::anyhow!("{e}"));
427 }
428 self.legacy_handle
429 .read()
430 .await
431 .submit_transaction(tx)
432 .await
433 .map_err(|e| anyhow::anyhow!("{e}"))
434 }
435
436 pub async fn update_leaf(
437 &self,
438 leaf: Leaf2<T>,
439 state: Arc<T::ValidatedState>,
440 delta: Option<Arc<<T::ValidatedState as ValidatedState<T>>::Delta>>,
441 ) -> anyhow::Result<()> {
442 let view = leaf.view_number();
443 if self.at_or_past_cutover(view).await {
444 return self
445 .client_api
446 .update_leaf(UpdateLeaf {
447 view,
448 leaf,
449 state,
450 delta,
451 })
452 .await
453 .map_err(|e| anyhow::anyhow!("{e}"));
454 }
455 self.legacy_handle
456 .read()
457 .await
458 .hotshot
459 .consensus()
460 .write()
461 .await
462 .update_leaf(leaf, state, delta)
463 .map_err(|e| anyhow::anyhow!("{e}"))
464 }
465
466 pub async fn start_consensus(&self) {
467 if self.cutover_active().await {
468 return;
471 }
472 self.legacy_handle
473 .read()
474 .await
475 .hotshot
476 .start_consensus()
477 .await;
478 }
479
480 pub async fn shut_down(&self) {
481 self.coordinator_task.abort();
482 self.legacy_handle.write().await.shut_down().await;
483 }
484}
485
486async fn run_coordinator<T, N, S>(mut coord: Coordinator<T, N, S>, tx: Sender<CoordinatorEvent<T>>)
487where
488 T: NodeType,
489 N: Network<T>,
490 S: NewProtocolStorage<T>,
491{
492 coord.start();
493
494 loop {
495 match coord.next_consensus_input().await {
496 Ok(input) => coord.apply_consensus(input),
497 Err(err) if err.severity == Severity::Critical => {
498 tracing::error!(%err, "coordinator: critical error");
499 return;
500 },
501 Err(err) => {
502 tracing::warn!(%err, "coordinator: non-critical error");
503 },
504 }
505 while let Some(output) = coord.outbox_mut().pop_front() {
506 if let Some(event) = consensus_event(&coord, &output) {
507 broadcast_event(&tx, event).await;
508 }
509 if let Err(err) = coord.process_consensus_output(output) {
510 if err.severity == Severity::Critical {
511 tracing::error!(%err, "coordinator: critical error processing output");
512 return;
513 } else {
514 tracing::warn!(%err, "coordinator: error processing output");
515 }
516 }
517 }
518 while let Some(output) = coord.coordinator_outbox_mut().pop_front() {
519 if let Some(event) = coordinator_event(&coord, &output) {
520 broadcast_event(&tx, event).await;
521 }
522 }
523 }
524}
525
526async fn broadcast_event<T>(sender: &Sender<CoordinatorEvent<T>>, event: CoordinatorEvent<T>)
527where
528 T: NodeType,
529{
530 match sender.broadcast_direct(event).await {
531 Ok(None) => {},
532 Ok(Some(overflowed)) => {
533 tracing::warn!(
534 %overflowed,
535 "coordinator event channel overflow, oldest event dropped"
536 );
537 },
538 Err(err) => {
539 tracing::warn!(%err, "failed to broadcast consensus event");
540 },
541 }
542}