Skip to main content

espresso_node/
consensus_handle.rs

1use std::{collections::HashMap, sync::Arc};
2
3use async_broadcast::{InactiveReceiver, Sender};
4use async_lock::RwLock;
5use committable::Commitment;
6use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
7use hotshot::{traits::NodeImplementation, types::SystemContextHandle};
8use hotshot_new_protocol::{
9    client::ClientApi,
10    consensus::{ConsensusOutput, PreCutoverSeed},
11    coordinator::{Coordinator, CoordinatorOutput, error::Severity},
12    cutover::{
13        CutoverGate, extract_pre_cutover_seed, forward_legacy_epoch_changes,
14        forward_legacy_timeout_votes,
15    },
16    network::Network,
17    state::UpdateLeaf,
18    storage::NewProtocolStorage,
19};
20use hotshot_types::{
21    data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
22    epoch_membership::EpochMembershipCoordinator,
23    event::{Event, LeafInfo},
24    message::{Proposal as SignedProposal, UpgradeLock, convert_proposal},
25    new_protocol::CoordinatorEvent,
26    traits::{ValidatedState, node_implementation::NodeType, signature_key::SignatureKey},
27    utils::StateAndDelta,
28};
29use tokio::spawn;
30use tokio_util::task::AbortOnDropHandle;
31use versions::NEW_PROTOCOL_VERSION;
32
33// TODO: `ConsensusOutput::LeafDecided` still carries fields (leaves +
34// vid_shares) rather than a `Vec<LeafInfo>`. This is because `Consensus` doesn't own `StateManager`
35// state and delta only become available one level up, in `Coordinator`.
36fn consensus_event<T, N, S>(
37    coordinator: &Coordinator<T, N, S>,
38    output: &ConsensusOutput<T>,
39) -> Option<CoordinatorEvent<T>>
40where
41    T: NodeType,
42    N: Network<T>,
43    S: NewProtocolStorage<T>,
44{
45    match output {
46        ConsensusOutput::LeafDecided {
47            leaves,
48            cert1,
49            cert2,
50            vid_shares,
51        } => {
52            if leaves.is_empty() {
53                tracing::error!("coordinator emitted LeafDecided with empty leaves");
54                return None;
55            }
56            let leaf_infos = leaves
57                .iter()
58                .zip(vid_shares.iter())
59                .map(|(leaf, vid_share)| {
60                    let (state, delta) = match coordinator.state(leaf.view_number()) {
61                        Some(s) => (s.state.clone(), s.delta.clone()),
62                        None => {
63                            let s = Arc::new(T::ValidatedState::from_header(leaf.block_header()));
64                            (s, None)
65                        },
66                    };
67                    let vid_share = vid_share
68                        .as_ref()
69                        .map(|share| VidDisperseShare::V2(share.data.clone()));
70                    LeafInfo::new(leaf.clone(), state, delta, vid_share, None)
71                })
72                .collect();
73            Some(CoordinatorEvent::NewDecide {
74                leaf_infos,
75                cert1: cert1.clone(),
76                cert2: cert2.clone(),
77            })
78        },
79        ConsensusOutput::ProposalValidated { proposal, sender } => {
80            Some(CoordinatorEvent::QuorumProposal {
81                proposal: proposal.clone(),
82                sender: sender.clone(),
83            })
84        },
85        ConsensusOutput::BlockPayloadReconstructed {
86            view,
87            header,
88            payload,
89        } => Some(CoordinatorEvent::BlockPayloadReconstructed {
90            view: *view,
91            header: header.clone(),
92            payload: payload.clone(),
93        }),
94        _ => None,
95    }
96}
97
98fn coordinator_event<T, N, S>(
99    coordinator: &Coordinator<T, N, S>,
100    output: &CoordinatorOutput<T>,
101) -> Option<CoordinatorEvent<T>>
102where
103    T: NodeType,
104    N: Network<T>,
105    S: NewProtocolStorage<T>,
106{
107    match output {
108        CoordinatorOutput::Consensus(inner) => consensus_event(coordinator, inner),
109        CoordinatorOutput::ExternalMessageReceived { sender, data } => {
110            Some(CoordinatorEvent::ExternalMessageReceived {
111                sender: sender.clone(),
112                data: data.clone(),
113            })
114        },
115    }
116}
117
118pub struct ConsensusHandle<T: NodeType, I: NodeImplementation<T>> {
119    legacy_handle: Arc<RwLock<SystemContextHandle<T, I>>>,
120    client_api: ClientApi<T>,
121    coordinator_task: AbortOnDropHandle<()>,
122    epoch_height: u64,
123    cutover_gate: CutoverGate,
124    legacy_event_rx: InactiveReceiver<Event<T>>,
125    event_rx: InactiveReceiver<CoordinatorEvent<T>>,
126}
127
128impl<T, I> ConsensusHandle<T, I>
129where
130    T: NodeType,
131    I: NodeImplementation<T>,
132{
133    pub fn new<N>(
134        legacy_handle: Arc<RwLock<SystemContextHandle<T, I>>>,
135        coordinator: Coordinator<T, N, I::Storage>,
136        epoch_height: u64,
137        legacy_event_rx: InactiveReceiver<Event<T>>,
138        event_channel_capacity: usize,
139    ) -> Self
140    where
141        N: Network<T> + Send + 'static,
142        I::Storage: NewProtocolStorage<T>,
143    {
144        let client_api = coordinator.client_api().clone();
145
146        let (mut event_tx, mut event_rx) =
147            async_broadcast::broadcast::<CoordinatorEvent<T>>(event_channel_capacity);
148        event_tx.set_await_active(false);
149        event_rx.set_overflow(true);
150
151        let coordinator_task =
152            AbortOnDropHandle::new(spawn(run_coordinator(coordinator, event_tx)));
153
154        spawn(forward_legacy_timeout_votes(
155            legacy_event_rx.clone(),
156            client_api.clone(),
157        ));
158        spawn(forward_legacy_epoch_changes(
159            legacy_event_rx.clone(),
160            client_api.clone(),
161            epoch_height,
162        ));
163
164        Self {
165            legacy_handle,
166            client_api,
167            coordinator_task,
168            epoch_height,
169            cutover_gate: CutoverGate::new(),
170            legacy_event_rx,
171            event_rx: event_rx.deactivate(),
172        }
173    }
174
175    pub async fn extract_pre_cutover_seed(&self) -> Option<PreCutoverSeed<T>> {
176        let legacy = self.legacy_handle.read().await;
177        extract_pre_cutover_seed(&legacy).await
178    }
179
180    pub fn legacy_consensus(&self) -> Arc<RwLock<SystemContextHandle<T, I>>> {
181        self.legacy_handle.clone()
182    }
183
184    pub fn client_api(&self) -> &ClientApi<T> {
185        &self.client_api
186    }
187
188    /// Whether `view` is at or past the new-protocol upgrade boundary,
189    /// according to the legacy upgrade lock. This is a stateless version
190    /// check — use it when routing per-view queries like `state(view)`.
191    /// For "should we route to the coordinator?" use [`cutover_active`](Self::cutover_active).
192    async fn at_or_past_cutover(&self, view: ViewNumber) -> bool {
193        self.legacy_handle
194            .read()
195            .await
196            .hotshot
197            .upgrade_lock
198            .version_infallible(view)
199            >= NEW_PROTOCOL_VERSION
200    }
201
202    /// Whether the cutover has happened — the gate has latched on this
203    /// node. Stateful: the first call after legacy crosses the cutover
204    /// view triggers seed extraction + dispatch. Use this for "should
205    /// we route to the coordinator?" decisions.
206    pub async fn cutover_active(&self) -> bool {
207        if self.cutover_gate.is_active() {
208            return true;
209        }
210        let legacy = self.legacy_handle.read().await;
211        self.cutover_gate.check(&legacy, &self.client_api).await
212    }
213
214    pub fn event_stream(&self) -> BoxStream<'static, CoordinatorEvent<T>> {
215        let old_stream = self
216            .legacy_event_rx
217            .activate_cloned()
218            .map(CoordinatorEvent::LegacyEvent);
219
220        let new_stream = self.event_rx.activate_cloned();
221
222        futures::stream::select(old_stream, new_stream).boxed()
223    }
224
225    pub async fn current_view(&self) -> ViewNumber {
226        if self.cutover_active().await {
227            return self
228                .client_api
229                .current_view()
230                .await
231                .expect("coordinator channel closed");
232        }
233        self.legacy_handle.read().await.cur_view().await
234    }
235
236    pub async fn decided_leaf(&self) -> Leaf2<T> {
237        if self.cutover_active().await {
238            return self
239                .client_api
240                .decided_leaf()
241                .await
242                .expect("coordinator channel closed");
243        }
244        self.legacy_handle.read().await.decided_leaf().await
245    }
246
247    pub async fn decided_state(&self) -> Option<Arc<T::ValidatedState>> {
248        if self.cutover_active().await {
249            return self
250                .client_api
251                .decided_state()
252                .await
253                .expect("coordinator channel closed");
254        }
255        Some(self.legacy_handle.read().await.decided_state().await)
256    }
257
258    pub async fn state(&self, view: ViewNumber) -> Option<Arc<T::ValidatedState>> {
259        if self.at_or_past_cutover(view).await {
260            return self
261                .client_api
262                .state(view)
263                .await
264                .expect("coordinator channel closed");
265        }
266        self.legacy_handle.read().await.state(view).await
267    }
268
269    pub async fn state_and_delta(&self, view: ViewNumber) -> StateAndDelta<T> {
270        if self.at_or_past_cutover(view).await {
271            return self
272                .client_api
273                .state_and_delta(view)
274                .await
275                .expect("coordinator channel closed");
276        }
277        self.legacy_handle
278            .read()
279            .await
280            .hotshot
281            .consensus()
282            .read()
283            .await
284            .state_and_delta(view)
285    }
286
287    pub async fn undecided_leaves(&self) -> Vec<Leaf2<T>> {
288        if self.cutover_active().await {
289            return self
290                .client_api
291                .undecided_leaves()
292                .await
293                .expect("coordinator channel closed");
294        }
295        self.legacy_handle
296            .read()
297            .await
298            .hotshot
299            .consensus()
300            .read()
301            .await
302            .undecided_leaves()
303    }
304
305    pub async fn current_epoch(&self) -> Option<EpochNumber> {
306        if self.cutover_active().await {
307            return self
308                .client_api
309                .current_epoch()
310                .await
311                .expect("coordinator channel closed");
312        }
313        self.legacy_handle.read().await.cur_epoch().await
314    }
315
316    pub async fn epoch_height(&self) -> u64 {
317        if self.cutover_active().await {
318            return self.epoch_height;
319        }
320        self.legacy_handle.read().await.epoch_height
321    }
322
323    // TODO: implement for new protocol
324    pub async fn membership_coordinator(&self) -> EpochMembershipCoordinator<T> {
325        self.legacy_handle
326            .read()
327            .await
328            .membership_coordinator
329            .clone()
330    }
331
332    // TODO: implement for new protocol
333    pub async fn upgrade_lock(&self) -> UpgradeLock<T> {
334        self.legacy_handle.read().await.hotshot.upgrade_lock.clone()
335    }
336
337    // TODO: implement for new protocol
338    pub async fn storage(&self) -> I::Storage {
339        self.legacy_handle.read().await.storage()
340    }
341
342    // TODO: implement for new protocol
343    pub async fn current_proposal_participation(&self) -> HashMap<T::SignatureKey, f64> {
344        self.legacy_handle
345            .read()
346            .await
347            .consensus()
348            .read()
349            .await
350            .current_proposal_participation()
351    }
352
353    pub async fn proposal_participation(
354        &self,
355        epoch: EpochNumber,
356    ) -> HashMap<T::SignatureKey, f64> {
357        self.legacy_handle
358            .read()
359            .await
360            .consensus()
361            .read()
362            .await
363            .proposal_participation(epoch)
364    }
365
366    pub async fn current_vote_participation(
367        &self,
368    ) -> HashMap<<T::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
369        self.legacy_handle
370            .read()
371            .await
372            .consensus()
373            .read()
374            .await
375            .current_vote_participation()
376    }
377
378    pub async fn vote_participation(
379        &self,
380        epoch: Option<EpochNumber>,
381    ) -> HashMap<<T::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
382        self.legacy_handle
383            .read()
384            .await
385            .consensus()
386            .read()
387            .await
388            .vote_participation(epoch)
389    }
390
391    pub async fn request_proposal(
392        &self,
393        view: ViewNumber,
394        leaf_commitment: Commitment<Leaf2<T>>,
395    ) -> anyhow::Result<
396        BoxFuture<'static, anyhow::Result<SignedProposal<T, QuorumProposalWrapper<T>>>>,
397    > {
398        if self.at_or_past_cutover(view).await {
399            let client_api = self.client_api.clone();
400            return Ok(async move {
401                client_api
402                    .request_proposal(view, leaf_commitment)
403                    .await
404                    .map(convert_proposal)
405                    .map_err(|err| anyhow::anyhow!("{err}"))
406            }
407            .boxed());
408        }
409
410        let future = self
411            .legacy_handle
412            .read()
413            .await
414            .request_proposal(view, leaf_commitment)
415            .map_err(|e| anyhow::anyhow!("{e}"))?;
416        Ok(async move { future.await.map_err(|e| anyhow::anyhow!("{e}")) }.boxed())
417    }
418
419    pub async fn submit_transaction(&self, tx: T::Transaction) -> anyhow::Result<()> {
420        let view = self.current_view().await;
421        if self.at_or_past_cutover(view).await {
422            return self
423                .client_api
424                .submit_transaction(tx)
425                .await
426                .map_err(|e| anyhow::anyhow!("{e}"));
427        }
428        self.legacy_handle
429            .read()
430            .await
431            .submit_transaction(tx)
432            .await
433            .map_err(|e| anyhow::anyhow!("{e}"))
434    }
435
436    pub async fn update_leaf(
437        &self,
438        leaf: Leaf2<T>,
439        state: Arc<T::ValidatedState>,
440        delta: Option<Arc<<T::ValidatedState as ValidatedState<T>>::Delta>>,
441    ) -> anyhow::Result<()> {
442        let view = leaf.view_number();
443        if self.at_or_past_cutover(view).await {
444            return self
445                .client_api
446                .update_leaf(UpdateLeaf {
447                    view,
448                    leaf,
449                    state,
450                    delta,
451                })
452                .await
453                .map_err(|e| anyhow::anyhow!("{e}"));
454        }
455        self.legacy_handle
456            .read()
457            .await
458            .hotshot
459            .consensus()
460            .write()
461            .await
462            .update_leaf(leaf, state, delta)
463            .map_err(|e| anyhow::anyhow!("{e}"))
464    }
465
466    pub async fn start_consensus(&self) {
467        if self.cutover_active().await {
468            // New protocol consensus is already running via the coordinator task.
469            // Don't start legacy HotShot consensus tasks.
470            return;
471        }
472        self.legacy_handle
473            .read()
474            .await
475            .hotshot
476            .start_consensus()
477            .await;
478    }
479
480    pub async fn shut_down(&self) {
481        self.coordinator_task.abort();
482        self.legacy_handle.write().await.shut_down().await;
483    }
484}
485
486async fn run_coordinator<T, N, S>(mut coord: Coordinator<T, N, S>, tx: Sender<CoordinatorEvent<T>>)
487where
488    T: NodeType,
489    N: Network<T>,
490    S: NewProtocolStorage<T>,
491{
492    coord.start();
493
494    loop {
495        match coord.next_consensus_input().await {
496            Ok(input) => coord.apply_consensus(input),
497            Err(err) if err.severity == Severity::Critical => {
498                tracing::error!(%err, "coordinator: critical error");
499                return;
500            },
501            Err(err) => {
502                tracing::warn!(%err, "coordinator: non-critical error");
503            },
504        }
505        while let Some(output) = coord.outbox_mut().pop_front() {
506            if let Some(event) = consensus_event(&coord, &output) {
507                broadcast_event(&tx, event).await;
508            }
509            if let Err(err) = coord.process_consensus_output(output) {
510                if err.severity == Severity::Critical {
511                    tracing::error!(%err, "coordinator: critical error processing output");
512                    return;
513                } else {
514                    tracing::warn!(%err, "coordinator: error processing output");
515                }
516            }
517        }
518        while let Some(output) = coord.coordinator_outbox_mut().pop_front() {
519            if let Some(event) = coordinator_event(&coord, &output) {
520                broadcast_event(&tx, event).await;
521            }
522        }
523    }
524}
525
526async fn broadcast_event<T>(sender: &Sender<CoordinatorEvent<T>>, event: CoordinatorEvent<T>)
527where
528    T: NodeType,
529{
530    match sender.broadcast_direct(event).await {
531        Ok(None) => {},
532        Ok(Some(overflowed)) => {
533            tracing::warn!(
534                %overflowed,
535                "coordinator event channel overflow, oldest event dropped"
536            );
537        },
538        Err(err) => {
539            tracing::warn!(%err, "failed to broadcast consensus event");
540        },
541    }
542}