Skip to main content

hotshot_new_protocol/
coordinator.rs

1pub mod error;
2pub mod timer;
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    sync::Arc,
7    time::Duration,
8};
9
10use bon::{Builder, bon};
11use committable::Commitment;
12use hotshot::{HotShotInitializer, traits::BlockPayload, types::SignatureKey};
13use hotshot_types::{
14    data::{EpochNumber, Leaf2, VidCommitment, VidDisperseShare2, ViewNumber},
15    epoch_membership::EpochMembershipCoordinator,
16    message::{Proposal as SignedProposal, UpgradeLock},
17    simple_certificate::{QuorumCertificate2, TimeoutCertificate2},
18    simple_vote::{HasEpoch, QuorumVote2, TimeoutVote2},
19    traits::{
20        block_contents::BlockHeader, node_implementation::NodeType,
21        signature_key::StateSignatureKey,
22    },
23    utils::{epoch_from_block_number, is_epoch_root},
24    vote::HasViewNumber,
25};
26use tokio::{select, sync::oneshot};
27use tracing::{error, info, warn};
28
29use crate::{
30    block::{BlockAndHeaderRequest, BlockBuilder, BlockBuilderConfig},
31    client::{ClientApi, ClientRequest, CoordinatorClient, QueryError},
32    consensus::{Consensus, ConsensusInput, ConsensusOutput},
33    coordinator::{
34        error::{CoordinatorError, ErrorSource, Severity},
35        timer::Timer,
36    },
37    epoch::{EpochManager, EpochRootResult},
38    epoch_root_vote_collector::EpochRootVoteCollector,
39    helpers::proposal_commitment,
40    logging::KeyPrefix,
41    message::{
42        self, BlockMessage, Certificate2, CheckpointCertificate, CheckpointVote, ConsensusMessage,
43        Message, MessageType, Proposal, ProposalFetchMessage, ProposalMessage, TimeoutOneHonest,
44        TransactionMessage, Unchecked, Vote2,
45    },
46    network::Network,
47    outbox::Outbox,
48    proposal::{ProposalValidator, ValidatedProposal, VidShareValidator},
49    state::{HeaderRequest, StateEntry, StateManager, StateManagerOutput},
50    storage::{NewProtocolStorage, Storage},
51    vid::{VidDisperseRequest, VidDisperser, VidReconstructor},
52    vote::VoteCollector,
53};
54
55#[allow(clippy::large_enum_variant)]
56pub enum CoordinatorOutput<T: NodeType> {
57    Consensus(ConsensusOutput<T>),
58    ExternalMessageReceived {
59        sender: T::SignatureKey,
60        data: Vec<u8>,
61    },
62}
63
64#[derive(Builder)]
65pub struct Coordinator<T: NodeType, N, S> {
66    membership_coordinator: EpochMembershipCoordinator<T>,
67    consensus: Consensus<T>,
68    network: N,
69    state_manager: StateManager<T>,
70    #[builder(default)]
71    client: CoordinatorClient<T>,
72    vid_disperser: VidDisperser<T>,
73    vid_reconstructor: VidReconstructor<T>,
74    vote1_collector: VoteCollector<T, QuorumVote2<T>, QuorumCertificate2<T>>,
75    vote2_collector: VoteCollector<T, Vote2<T>, Certificate2<T>>,
76    timeout_collector: VoteCollector<T, TimeoutVote2<T>, TimeoutCertificate2<T>>,
77    timeout_one_honest_collector: VoteCollector<T, TimeoutVote2<T>, TimeoutOneHonest<T>>,
78    checkpoint_collector: VoteCollector<T, CheckpointVote<T>, CheckpointCertificate<T>>,
79    epoch_root_collector: EpochRootVoteCollector<T>,
80    epoch_manager: EpochManager<T>,
81    block_builder: BlockBuilder<T>,
82    proposal_validator: ProposalValidator<T>,
83    share_validator: VidShareValidator<T>,
84    storage: Storage<T, S>,
85    #[builder(default)]
86    outbox: Outbox<ConsensusOutput<T>>,
87    #[builder(default)]
88    coordinator_outbox: Outbox<CoordinatorOutput<T>>,
89    public_key: T::SignatureKey,
90    #[builder(default = KeyPrefix::from(&public_key))]
91    node_id: KeyPrefix,
92    timer: Timer,
93    #[builder(skip)]
94    pending_proposal_fetches: PendingProposalFetches<T>,
95    #[builder(default)]
96    cached_validated_proposals: BTreeMap<ViewNumber, ValidatedProposal<T>>,
97    #[builder(default)]
98    cached_vid_shares: BTreeMap<ViewNumber, VidDisperseShare2<T>>,
99}
100
101#[bon]
102impl<T, N, S> Coordinator<T, N, S>
103where
104    T: NodeType,
105    N: Network<T>,
106    S: NewProtocolStorage<T>,
107{
108    #[builder(builder_type = CoordinatorMaker, finish_fn = make)]
109    #[allow(clippy::too_many_arguments)]
110    pub fn maker(
111        membership_coordinator: EpochMembershipCoordinator<T>,
112        network: N,
113        initializer: &HotShotInitializer<T>,
114        upgrade_lock: UpgradeLock<T>,
115        public_key: T::SignatureKey,
116        private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
117        state_private_key: <T::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
118        stake_table_capacity: usize,
119        timeout_duration: Duration,
120        storage: S,
121        garbage_collection_interval: u64,
122    ) -> Self {
123        let mut consensus = Consensus::new(
124            membership_coordinator.clone(),
125            public_key.clone(),
126            private_key.clone(),
127            state_private_key,
128            stake_table_capacity,
129            upgrade_lock.clone(),
130            initializer.anchor_leaf.clone(),
131            initializer.epoch_height,
132            garbage_collection_interval,
133        );
134
135        let genesis_cert1 = initializer.high_qc.clone();
136        let genesis_proposal = message::Proposal {
137            block_header: initializer.anchor_leaf.block_header().clone(),
138            view_number: ViewNumber::genesis(),
139            epoch: EpochNumber::genesis(),
140            justify_qc: genesis_cert1.clone(),
141            next_epoch_justify_qc: None,
142            upgrade_certificate: None,
143            view_change_evidence: None,
144            next_drb_result: None,
145            state_cert: None,
146        };
147        let mut state_manager = StateManager::new(
148            Arc::new(initializer.instance_state.clone()),
149            upgrade_lock.clone(),
150        );
151        state_manager.seed_state(
152            initializer.anchor_leaf.view_number(),
153            initializer.anchor_state.clone(),
154            initializer.anchor_leaf.clone(),
155        );
156        // The synthetic genesis proposal has a non-null justify_qc (the genesis
157        // cert1) so the leaf derived from it has a different commitment than
158        // the anchor leaf produced by `Leaf2::genesis`. `request_header` for
159        // view 1 looks up the parent state by the *proposal's* leaf
160        // commitment, so seed the same state under that commitment too.
161        state_manager.seed_state(
162            ViewNumber::genesis(),
163            initializer.anchor_state.clone(),
164            Leaf2::from(genesis_proposal.clone()),
165        );
166        consensus.seed_genesis(genesis_cert1, genesis_proposal);
167
168        let lock = upgrade_lock.clone();
169        Self::builder()
170            .consensus(consensus)
171            .network(network)
172            .state_manager(state_manager)
173            .vid_disperser(VidDisperser::new(membership_coordinator.clone()))
174            .vid_reconstructor(VidReconstructor::new())
175            .vote1_collector(VoteCollector::new(
176                membership_coordinator.clone(),
177                lock.clone(),
178            ))
179            .vote2_collector(VoteCollector::new(
180                membership_coordinator.clone(),
181                lock.clone(),
182            ))
183            .timeout_collector(VoteCollector::new(
184                membership_coordinator.clone(),
185                lock.clone(),
186            ))
187            .timeout_one_honest_collector(VoteCollector::new(
188                membership_coordinator.clone(),
189                lock.clone(),
190            ))
191            .checkpoint_collector(VoteCollector::new(
192                membership_coordinator.clone(),
193                lock.clone(),
194            ))
195            .epoch_root_collector(EpochRootVoteCollector::new(
196                membership_coordinator.clone(),
197                lock,
198            ))
199            .epoch_manager(EpochManager::new(
200                initializer.epoch_height,
201                membership_coordinator.clone(),
202            ))
203            .block_builder(BlockBuilder::new(
204                Arc::new(initializer.instance_state.clone()),
205                membership_coordinator.clone(),
206                BlockBuilderConfig::default(),
207                upgrade_lock.clone(),
208            ))
209            .proposal_validator(ProposalValidator::new(
210                membership_coordinator.clone(),
211                initializer.epoch_height,
212                upgrade_lock.clone(),
213            ))
214            .share_validator(VidShareValidator::new(
215                membership_coordinator.clone(),
216                initializer.epoch_height,
217                upgrade_lock,
218            ))
219            .storage(Storage::new(storage, private_key))
220            .membership_coordinator(membership_coordinator)
221            .timer(Timer::new(
222                timeout_duration,
223                ViewNumber::genesis(),
224                EpochNumber::genesis(),
225            ))
226            .public_key(public_key)
227            .build()
228    }
229
230    /// Emit `ViewChanged(current_view + 1)` and, if leader, a
231    /// `RequestBlockAndHeader`.
232    pub fn start(&mut self) {
233        let cur_view = self.consensus.current_view();
234        let next_view = cur_view + 1;
235        let epoch = self
236            .consensus
237            .current_epoch()
238            .unwrap_or(EpochNumber::genesis());
239
240        if self.consensus.last_decided_leaf().view_number() == ViewNumber::genesis() {
241            // Genesis DA never flows through the normal block-builder path.
242            let genesis_leaf = self.consensus.last_decided_leaf().clone();
243            let (payload, metadata) = T::BlockPayload::empty();
244            self.storage.append_da(
245                ViewNumber::genesis(),
246                EpochNumber::genesis(),
247                payload,
248                metadata,
249                genesis_leaf.payload_commitment(),
250            );
251
252            // Emit `LeafDecided` for genesis so persistence sees the header.
253            self.outbox.push_back(ConsensusOutput::LeafDecided {
254                leaves: vec![genesis_leaf],
255                cert1: self
256                    .consensus
257                    .cert1_at(ViewNumber::genesis())
258                    .cloned()
259                    .expect("genesis cert1 must be seeded"),
260                cert2: None,
261                vid_shares: vec![None],
262            });
263        }
264
265        self.outbox
266            .push_back(ConsensusOutput::ViewChanged(next_view, epoch));
267
268        if let Some(leader) = self.leader(next_view, epoch)
269            && leader == self.public_key
270        {
271            let parent_proposal = self
272                .consensus
273                .proposal_at(cur_view)
274                .expect("parent proposal must be seeded before start()")
275                .clone();
276            self.outbox
277                .push_back(ConsensusOutput::RequestBlockAndHeader(
278                    BlockAndHeaderRequest {
279                        view: next_view,
280                        epoch,
281                        parent_proposal,
282                    },
283                ));
284        }
285    }
286
287    /// Kick the leader after the seed lands when a forwarded TC2 had
288    /// already advanced `current_view`. No-op unless leader and all
289    /// prerequisites are present.
290    fn resume_after_cutover_tc(&mut self) {
291        let cur_view = self.consensus.current_view();
292        if self.consensus.timeout_cert_at(cur_view).is_none() {
293            return;
294        }
295        let epoch = self
296            .consensus
297            .current_epoch()
298            .unwrap_or(EpochNumber::genesis());
299        let Some(leader) = self.leader(cur_view, epoch) else {
300            return;
301        };
302        if leader != self.public_key {
303            return;
304        }
305        let Some(locked_view) = self.consensus.locked_view() else {
306            return;
307        };
308        let Some(parent_proposal) = self.consensus.proposal_at(locked_view).cloned() else {
309            return;
310        };
311        self.outbox
312            .push_back(ConsensusOutput::RequestBlockAndHeader(
313                BlockAndHeaderRequest {
314                    view: cur_view,
315                    epoch,
316                    parent_proposal,
317                },
318            ));
319    }
320
321    pub async fn stop(mut self) {
322        self.network.shutdown().await
323    }
324
325    pub async fn next_consensus_input(&mut self) -> Result<ConsensusInput<T>, CoordinatorError> {
326        loop {
327            select! {
328                message = self.network.receive() => match message {
329                    Ok(m) => {
330                        if let Some(input) = self.on_network_message(m).await {
331                            return Ok(input)
332                        }
333                    }
334                    Err(e) => {
335                        return Err(CoordinatorError::from(e).context("network receive"))
336                    }
337                },
338                () = &mut self.timer => {
339                    let input = ConsensusInput::Timeout(self.timer.view(), self.timer.epoch());
340                    return Ok(input)
341                }
342                Some(output) = self.state_manager.next() => {
343                    if let Some(input) = self.on_state_manager_output(output) {
344                        return Ok(input)
345                    }
346                }
347                Some(request) = self.client.next_request() => {
348                    if let Err(err) = self.on_client_request(request).await {
349                        error!(%err, "error while handling client request");
350                    }
351                }
352                Some(tcert) = self.timeout_collector.next() => {
353                    return Ok(ConsensusInput::TimeoutCertificate(tcert))
354                }
355                Some(out) = self.timeout_one_honest_collector.next() => {
356                    let Some(epoch) = out.data.epoch else {
357                        let msg = format!("missing epoch in view {}", out.view_number());
358                        return Err(CoordinatorError::regular(msg).context("gc timeout one honest"))
359                    };
360                    return Ok(ConsensusInput::TimeoutOneHonest(out.view_number(), epoch))
361                }
362                Some(cert1) = self.vote1_collector.next() => {
363                    return Ok(ConsensusInput::Certificate1(cert1))
364                }
365                Some(cert2) = self.vote2_collector.next() => {
366                    return Ok(ConsensusInput::Certificate2(cert2))
367                }
368                Some((cert1, state_cert)) = self.epoch_root_collector.next() => {
369                    self.storage.append_state_cert(
370                        ViewNumber::new(state_cert.light_client_state.view_number),
371                        state_cert.clone(),
372                    );
373                    return Ok(ConsensusInput::EpochRootCertificates { cert1, state_cert })
374                }
375                Some(item) = self.share_validator.next() => match item {
376                    Ok(vid_share) => {
377                        let view = vid_share.view_number();
378                        let Some(validated) = self.cached_validated_proposals.remove(&view) else {
379                            // Wait for the proposal
380                            self.cached_vid_shares.insert(view, vid_share);
381                            continue;
382                        };
383                        if !check_payload_commitment(&validated.message.proposal, &vid_share) {
384                            continue;
385                        }
386                        return self.on_proposal_and_vid_share(validated, vid_share)
387                    },
388                    Err(e) => {
389                        return Err(CoordinatorError::regular(e).context("vid share validation"))
390                    }
391                },
392                Some(item) = self.proposal_validator.next() => match item {
393                    Ok(validated) => {
394                        // Refresh the network's peer set when a proposal is validated.
395                        let epoch = validated.message.proposal.data.epoch;
396                        if let Err(err) = self
397                            .network
398                            .apply_epoch(epoch, &self.membership_coordinator)
399                        {
400                            error!(%epoch, %err, "network apply_epoch failed");
401                        }
402
403                        let view = validated.message.proposal.data.view_number();
404                        let Some(vid_share) = self.cached_vid_shares.remove(&view) else {
405                            // Wait for the vid share
406                            self.cached_validated_proposals.insert(view, validated);
407                            continue;
408                        };
409                        // Check for commitment correspondence
410                        if !check_payload_commitment(&validated.message.proposal, &vid_share) {
411                            continue;
412                        }
413                        return self.on_proposal_and_vid_share(validated, vid_share)
414                    }
415                    Err(e) => {
416                        return Err(CoordinatorError::regular(e).context("proposal validation"))
417                    }
418                },
419                Some(cert) = self.checkpoint_collector.next() => {
420                    let Some(epoch) = cert.epoch() else {
421                        let msg = format!("missing epoch in view {}", cert.view_number());
422                        return Err(CoordinatorError::critical(msg).context("gc certificate"))
423                    };
424                    self.gc(cert.view_number(), epoch);
425                }
426                Some(item) = self.block_builder.next() => match item {
427                    Ok(block) => {
428                        self.state_manager.request_header(HeaderRequest::from(&block));
429                        let next_view = block.view + 1;
430                        let epoch = block.epoch;
431                        let manifest = block.manifest.clone();
432                        self.storage.append_da(
433                            block.view,
434                            block.epoch,
435                            block.payload.payload.clone(),
436                            block.payload.metadata.clone(),
437                            block.payload_commitment,
438                        );
439                        // We built this block; skip reconstructing it from our own loopback share.
440                        self.vid_reconstructor.mark_reconstructed(block.view);
441                        self.unicast_to_leader(
442                            next_view,
443                            epoch,
444                            BlockMessage::DedupManifest(manifest),
445                        )?;
446                        return Ok(block.into())
447                    }
448                    Err(err) => {
449                        return Err(CoordinatorError::regular(err).context("block building"))
450                    }
451                },
452                Some(item) = self.vid_disperser.next() => match item {
453                    Ok(out) => {
454                        return Ok(ConsensusInput::VidDisperseCreated(out.view, out.disperse))
455                    }
456                    Err(()) => {
457                        return Err(CoordinatorError::unspecified().context("vid disperse"))
458                    }
459                },
460                Some(item) = self.vid_reconstructor.next() => match item {
461                    Ok(out) => {
462                        self.block_builder.on_block_reconstructed(out.tx_commitments);
463                        self.storage.append_da(
464                            out.view,
465                            out.epoch,
466                            out.payload.clone(),
467                            out.metadata.clone(),
468                            VidCommitment::V2(out.payload_commitment),
469                        );
470                        if let Some(proposal) = self.consensus.proposal_at(out.view) {
471                            self.outbox.push_back(ConsensusOutput::BlockPayloadReconstructed {
472                                view: out.view,
473                                header: proposal.block_header.clone(),
474                                payload: out.payload,
475                            });
476                        }
477                        return Ok(ConsensusInput::BlockReconstructed(out.view, out.payload_commitment))
478                    }
479                    Err(()) => {
480                        return Err(CoordinatorError::unspecified().context("vid reconstruction"))
481                    }
482                },
483                Some(result) = self.epoch_manager.next() => match result {
484                    Ok(EpochRootResult::DrbResult(epoch, drb_result)) => {
485                        // New epoch data available — retry votes that were
486                        // buffered because their membership wasn't ready.
487                        self.vote1_collector.retry_pending_votes().await;
488                        self.vote2_collector.retry_pending_votes().await;
489                        self.timeout_collector.retry_pending_votes().await;
490                        self.timeout_one_honest_collector.retry_pending_votes().await;
491                        return Ok(ConsensusInput::DrbResult(epoch, drb_result))
492                    }
493                    Err(failure) => {
494                        // Catchup/compute failed. The epoch manager clears
495                        // the pending guard; consensus's `maybe_propose`
496                        // will re-request the DRB when it next tries to
497                        // build a transition proposal and finds it missing.
498                        warn!(%failure.error, epoch = %failure.epoch, "DRB request failed");
499                        continue;
500                    }
501                },
502                else => {
503                    return Err(CoordinatorError::critical(ErrorSource::NoInput))
504                }
505            }
506        }
507    }
508
509    pub fn apply_consensus(&mut self, input: ConsensusInput<T>) {
510        self.consensus.apply(input, &mut self.outbox)
511    }
512
513    pub fn process_consensus_output(
514        &mut self,
515        output: ConsensusOutput<T>,
516    ) -> Result<(), CoordinatorError> {
517        match output {
518            ConsensusOutput::RequestState(state_request) => {
519                self.state_manager.request_state(state_request);
520            },
521            ConsensusOutput::RequestVidDisperse {
522                view,
523                epoch,
524                payload,
525                metadata,
526            } => {
527                self.vid_disperser.request_vid_disperse(VidDisperseRequest {
528                    view,
529                    epoch,
530                    block: payload,
531                    metadata,
532                });
533            },
534            ConsensusOutput::RequestDrbResult(epoch) => {
535                self.epoch_manager.request_drb_result(epoch);
536            },
537            ConsensusOutput::SendCheckpointVote(checkpoint_vote) => {
538                let message = Message {
539                    sender: self.public_key.clone(),
540                    message_type: MessageType::Consensus(ConsensusMessage::Checkpoint(
541                        checkpoint_vote,
542                    )),
543                };
544                self.network
545                    .broadcast(message.view_number(), &message)
546                    .map_err(|e| CoordinatorError::from(e).context("broadcast checkpoint vote"))?
547            },
548            ConsensusOutput::LeafDecided { leaves, cert2, .. } => {
549                if let Some(cert2) = cert2 {
550                    self.storage.append_cert2(cert2.view_number, cert2.clone());
551                }
552                for leaf in leaves {
553                    self.epoch_manager.handle_leaf_decided(leaf);
554                }
555            },
556            ConsensusOutput::LockUpdated(_) => {}, // TODO
557            ConsensusOutput::RequestBlockAndHeader(request) => {
558                self.block_builder.request_block(request);
559            },
560            ConsensusOutput::SendProposal(proposal) => {
561                self.storage.append_proposal(proposal.data.clone());
562                // TODO: This may be done async in network so we do not spend
563                // too much time here in this loop.
564
565                let message = Message {
566                    sender: self.public_key.clone(),
567                    message_type: MessageType::Consensus(ConsensusMessage::Proposal(
568                        ProposalMessage::validated(proposal.clone()),
569                    )),
570                };
571                if let Err(err) = self.network.broadcast(message.view_number(), &message) {
572                    let err = CoordinatorError::from(err).context("proposal broadcast");
573                    if err.severity == Severity::Critical {
574                        return Err(err);
575                    } else {
576                        warn!(%err, "network error while broadcasting proposal")
577                    }
578                }
579            },
580            ConsensusOutput::SendVidShares(vid_shares) => {
581                for share in vid_shares {
582                    let recipient = share.data.recipient_key.clone();
583                    let message = Message {
584                        sender: self.public_key.clone(),
585                        message_type: MessageType::Consensus(ConsensusMessage::VidShare(share)),
586                    };
587                    if let Err(err) =
588                        self.network
589                            .unicast(message.view_number(), &recipient, &message)
590                    {
591                        let err = CoordinatorError::from(err).context("vid share unicast");
592                        if err.severity == Severity::Critical {
593                            return Err(err);
594                        } else {
595                            warn!(%err, "network error while sending vid share")
596                        }
597                    }
598                }
599            },
600            ConsensusOutput::SendTimeoutVote(vote, lock) => {
601                let message = Message {
602                    sender: self.public_key.clone(),
603                    message_type: MessageType::Consensus(ConsensusMessage::TimeoutVote(
604                        message::TimeoutVoteMessage { vote, lock },
605                    )),
606                };
607                self.network
608                    .broadcast(message.view_number(), &message)
609                    .map_err(|e| CoordinatorError::from(e).context("broadcast timeout vote"))?
610            },
611            ConsensusOutput::SendTimeoutCertificate(tc, view, epoch) => {
612                if let Some(leader) = self.leader(view, epoch) {
613                    let message = Message {
614                        sender: self.public_key.clone(),
615                        message_type: MessageType::Consensus(ConsensusMessage::TimeoutCertificate(
616                            tc,
617                        )),
618                    };
619                    self.network
620                        .unicast(message.view_number(), &leader, &message)
621                        .map_err(|e| CoordinatorError::from(e).context("timeout certificate"))?;
622                }
623            },
624            ConsensusOutput::SendVote1(vote1) => {
625                let message = Message {
626                    sender: self.public_key.clone(),
627                    message_type: MessageType::Consensus(ConsensusMessage::Vote1(vote1)),
628                };
629                self.network
630                    .broadcast(message.view_number(), &message)
631                    .map_err(|e| CoordinatorError::from(e).context("broadcast vote1"))?
632            },
633            ConsensusOutput::SendVote2(vote2) => {
634                let message = Message {
635                    sender: self.public_key.clone(),
636                    message_type: MessageType::Consensus(ConsensusMessage::Vote2(vote2)),
637                };
638                self.network
639                    .broadcast(message.view_number(), &message)
640                    .map_err(|e| CoordinatorError::from(e).context("broadcast vote2"))?
641            },
642            ConsensusOutput::SendEpochChange(epoch_change) => {
643                let message = Message {
644                    sender: self.public_key.clone(),
645                    message_type: MessageType::Consensus(ConsensusMessage::EpochChange(
646                        epoch_change,
647                    )),
648                };
649                self.network
650                    .broadcast(message.view_number(), &message)
651                    .map_err(|e| CoordinatorError::from(e).context("broadcast epoch change"))?
652            },
653            ConsensusOutput::SendCertificate1(cert1) => {
654                let message = Message {
655                    sender: self.public_key.clone(),
656                    message_type: MessageType::Consensus(ConsensusMessage::Certificate1(
657                        cert1,
658                        self.public_key.clone(),
659                    )),
660                };
661                self.network
662                    .broadcast(message.view_number(), &message)
663                    .map_err(|e| CoordinatorError::from(e).context("broadcast certificate1"))?
664            },
665            ConsensusOutput::ProposalValidated { .. } => {},
666            ConsensusOutput::ViewChanged(view, epoch) => {
667                self.consensus.set_view(view, epoch);
668                self.timer.reset_with_epoch(view, epoch);
669                let txns = self.block_builder.on_view_changed(view, epoch);
670                if !txns.is_empty() {
671                    let next_view = view + 1;
672                    self.unicast_to_leader(
673                        next_view,
674                        epoch,
675                        BlockMessage::Transactions(TransactionMessage {
676                            view: next_view,
677                            transactions: txns,
678                        }),
679                    )
680                    .map_err(|e| e.context("unicast transactions"))?;
681                }
682
683                // Proactively fetch the DRB for the next epoch so
684                // late-starting nodes have it before they need it
685                let next_epoch = epoch + 1;
686                if next_epoch > EpochNumber::genesis() + 1 {
687                    self.epoch_manager.request_drb_result(next_epoch);
688                }
689            },
690            ConsensusOutput::BlockPayloadReconstructed { .. } => {},
691        }
692        Ok(())
693    }
694
695    pub fn node_id(&self) -> &KeyPrefix {
696        &self.node_id
697    }
698
699    pub fn outbox(&self) -> &Outbox<ConsensusOutput<T>> {
700        &self.outbox
701    }
702
703    pub fn outbox_mut(&mut self) -> &mut Outbox<ConsensusOutput<T>> {
704        &mut self.outbox
705    }
706
707    pub fn coordinator_outbox(&self) -> &Outbox<CoordinatorOutput<T>> {
708        &self.coordinator_outbox
709    }
710
711    pub fn coordinator_outbox_mut(&mut self) -> &mut Outbox<CoordinatorOutput<T>> {
712        &mut self.coordinator_outbox
713    }
714
715    pub fn current_view(&self) -> ViewNumber {
716        self.consensus.current_view()
717    }
718
719    pub fn state(&self, v: ViewNumber) -> Option<&StateEntry<T>> {
720        self.state_manager.get_state(v)
721    }
722
723    pub fn client_api(&self) -> &ClientApi<T> {
724        self.client.handle()
725    }
726
727    pub(crate) async fn on_network_message(
728        &mut self,
729        message: Message<T, Unchecked>,
730    ) -> Option<ConsensusInput<T>> {
731        match message.message_type {
732            MessageType::Consensus(msg) => match msg {
733                ConsensusMessage::Proposal(p) => {
734                    if self.consensus.wants_proposal_for_view(&p.view_number()) {
735                        self.proposal_validator.validate(p);
736                    }
737                    None
738                },
739                ConsensusMessage::VidShare(share) => {
740                    if self
741                        .consensus
742                        .wants_proposal_for_view(&share.data.view_number())
743                    {
744                        self.share_validator.validate(share);
745                    }
746                    None
747                },
748                ConsensusMessage::Vote1(vote1) => {
749                    let bn = vote1.vote.data.block_number.unwrap_or(0);
750                    let epoch_height = *self.consensus.epoch_height;
751                    if is_epoch_root(bn, epoch_height) {
752                        // An epoch-root Vote1 MUST carry a state_vote.
753                        // Reject otherwise.
754                        vote1.state_vote.as_ref()?;
755                        self.epoch_root_collector.accumulate(vote1.clone()).await;
756                    } else {
757                        self.vote1_collector
758                            .accumulate_vote(vote1.vote.clone())
759                            .await;
760                    }
761                    self.vid_reconstructor
762                        .handle_vid_share(vote1.vid_share, None);
763                    None
764                },
765                ConsensusMessage::Vote2(vote2) => {
766                    self.vote2_collector.accumulate_vote(vote2).await;
767                    None
768                },
769                ConsensusMessage::Certificate1(certificate1, _key) => {
770                    Some(ConsensusInput::Certificate1(certificate1))
771                },
772                ConsensusMessage::Certificate2(certificate2, _key) => {
773                    Some(ConsensusInput::Certificate2(certificate2))
774                },
775                ConsensusMessage::TimeoutVote(timeout_msg) => {
776                    self.timeout_collector
777                        .accumulate_vote(timeout_msg.vote.clone())
778                        .await;
779                    self.timeout_one_honest_collector
780                        .accumulate_vote(timeout_msg.vote)
781                        .await;
782                    None
783                },
784                ConsensusMessage::TimeoutCertificate(tc) => {
785                    Some(ConsensusInput::TimeoutCertificate(tc))
786                },
787                ConsensusMessage::EpochChange(epoch_change) => {
788                    Some(ConsensusInput::EpochChange(epoch_change))
789                },
790                ConsensusMessage::Checkpoint(checkpoint) => {
791                    self.checkpoint_collector.accumulate_vote(checkpoint).await;
792                    None
793                },
794            },
795            MessageType::Block(msg) => {
796                match msg {
797                    BlockMessage::Transactions(msg) => self.block_builder.on_transactions(msg),
798                    BlockMessage::DedupManifest(manifest) => {
799                        if let Some(view_leader) = self.leader(manifest.view, manifest.epoch)
800                            && view_leader == message.sender
801                        {
802                            self.block_builder.on_dedup_manifest(manifest)
803                        }
804                    },
805                }
806                None
807            },
808            MessageType::ProposalFetch(ProposalFetchMessage::Request(request)) => {
809                if !request.validate_sender(&message.sender) {
810                    warn!(
811                        sender = %message.sender,
812                        view = %request.view_number(),
813                        "ignoring invalid proposal fetch request signature"
814                    );
815                    return None;
816                }
817                if let Some(proposal) = self
818                    .consensus
819                    .signed_proposal(&request.view_number())
820                    .cloned()
821                {
822                    let response = Message {
823                        sender: self.public_key.clone(),
824                        message_type: MessageType::ProposalFetch(ProposalFetchMessage::Response(
825                            Box::new(proposal),
826                        )),
827                    };
828
829                    if let Err(err) =
830                        self.network
831                            .unicast(request.view_number(), &message.sender, &response)
832                    {
833                        let err = CoordinatorError::from(err).context("proposal response");
834                        warn!(%err, "network error while sending proposal response");
835                    }
836                }
837                None
838            },
839            MessageType::ProposalFetch(ProposalFetchMessage::Response(proposal)) => {
840                self.pending_proposal_fetches.resolve(&proposal);
841                None
842            },
843            MessageType::External(data) => {
844                self.coordinator_outbox
845                    .push_back(CoordinatorOutput::ExternalMessageReceived {
846                        sender: message.sender,
847                        data,
848                    });
849                None
850            },
851        }
852    }
853
854    fn on_state_manager_output(
855        &mut self,
856        output: StateManagerOutput<T>,
857    ) -> Option<ConsensusInput<T>> {
858        match output {
859            StateManagerOutput::State {
860                response,
861                validated: true,
862            } => Some(ConsensusInput::StateValidated(response)),
863            StateManagerOutput::State {
864                response,
865                validated: false,
866            } => Some(ConsensusInput::StateValidationFailed(response)),
867            StateManagerOutput::Header {
868                response,
869                header: Some(hdr),
870            } => Some(ConsensusInput::HeaderCreated(
871                response.view,
872                proposal_commitment(&response.parent_proposal),
873                hdr,
874            )),
875            StateManagerOutput::Header {
876                response,
877                header: None,
878            } => {
879                tracing::warn!(view = %response.view, "header creation failed");
880                None
881            },
882        }
883    }
884
885    fn on_proposal_and_vid_share(
886        &mut self,
887        validated: ValidatedProposal<T>,
888        vid_share: VidDisperseShare2<T>,
889    ) -> Result<ConsensusInput<T>, CoordinatorError> {
890        self.storage.append_vid(vid_share.clone());
891        self.storage
892            .append_proposal(validated.message.proposal.data.clone());
893
894        let m = validated
895            .message
896            .proposal
897            .data
898            .block_header
899            .metadata()
900            .clone();
901        self.vid_reconstructor
902            .handle_vid_share(vid_share.clone(), m);
903
904        // GC for the cache
905        let view = validated.message.proposal.data.view_number();
906        self.cached_vid_shares = self.cached_vid_shares.split_off(&(view + 1));
907        self.cached_validated_proposals = self.cached_validated_proposals.split_off(&(view + 1));
908
909        Ok(ConsensusInput::ProposalWithVidShare(
910            validated.sender,
911            validated.message,
912            vid_share,
913        ))
914    }
915
916    fn unicast_to_leader(
917        &mut self,
918        view: ViewNumber,
919        epoch: EpochNumber,
920        msg: BlockMessage<T>,
921    ) -> Result<(), CoordinatorError> {
922        let Some(leader) = self.leader(view, epoch) else {
923            warn!(%view, %epoch, "failed to resolve leader for unicast");
924            return Ok(());
925        };
926        let message = Message {
927            sender: self.public_key.clone(),
928            message_type: MessageType::Block(msg),
929        };
930        self.network
931            .unicast(message.view_number(), &leader, &message)
932            .map_err(|e| CoordinatorError::from(e).context("leader unicast"))
933    }
934
935    fn leader(&mut self, view: ViewNumber, epoch: EpochNumber) -> Option<T::SignatureKey> {
936        let membership = self
937            .membership_coordinator
938            .membership_for_epoch(Some(epoch))
939            .ok()?;
940        membership.leader(view).ok()
941    }
942
943    async fn on_client_request(
944        &mut self,
945        request: ClientRequest<T>,
946    ) -> Result<(), CoordinatorError> {
947        match request {
948            ClientRequest::CurrentView(tx) => {
949                let _ = tx.send(self.consensus.current_view());
950            },
951            ClientRequest::CurrentEpoch(tx) => {
952                let _ = tx.send(self.consensus.current_epoch());
953            },
954            ClientRequest::DecidedLeaf(tx) => {
955                let _ = tx.send(self.consensus.last_decided_leaf().clone());
956            },
957            ClientRequest::DecidedState(tx) => {
958                let view = self.consensus.last_decided_leaf().view_number();
959                let _ = tx.send(self.state(view).map(|s| s.state.clone()));
960            },
961            ClientRequest::UndecidedLeaves(tx) => {
962                let _ = tx.send(self.consensus.undecided_leaves().cloned().collect());
963            },
964            ClientRequest::GetState { view, respond } => {
965                let _ = respond.send(self.state(view).map(|s| s.state.clone()));
966            },
967            ClientRequest::GetStateAndDelta { view, respond } => {
968                let _ = respond.send(match self.state(view) {
969                    Some(s) => (Some(s.state.clone()), s.delta.clone()),
970                    None => (None, None),
971                });
972            },
973            ClientRequest::SubmitTransaction { tx, respond } => {
974                self.block_builder.on_submit_transaction(tx);
975                let _ = respond.send(());
976            },
977            ClientRequest::UpdateLeaf { update, respond } => {
978                self.state_manager.update_state(update);
979                let _ = respond.send(());
980            },
981            ClientRequest::RequestProposal {
982                view,
983                leaf_commitment,
984                respond,
985            } => {
986                if let Some(proposal) = self.consensus.signed_proposal(&view)
987                    && proposal_commitment(&proposal.data) == leaf_commitment
988                {
989                    let _ = respond.send(Ok(proposal.clone()));
990                    return Ok(());
991                }
992                if !self
993                    .pending_proposal_fetches
994                    .contains_request(view, leaf_commitment)
995                {
996                    let request =
997                        self.consensus
998                            .signed_proposal_fetch_request(view)
999                            .map_err(|err| {
1000                                let err = format!("failed to sign proposal request: {err}");
1001                                CoordinatorError::regular(err).context("sign proposal request")
1002                            })?;
1003
1004                    let message = Message {
1005                        sender: self.public_key.clone(),
1006                        message_type: MessageType::ProposalFetch(ProposalFetchMessage::Request(
1007                            request,
1008                        )),
1009                    };
1010
1011                    self.network
1012                        .broadcast(message.view_number(), &message)
1013                        .map_err(|err| {
1014                            CoordinatorError::from(err).context("broadcast proposal request")
1015                        })?;
1016                }
1017                self.pending_proposal_fetches
1018                    .push(view, leaf_commitment, respond);
1019            },
1020            ClientRequest::SendExternalMessage {
1021                view,
1022                payload,
1023                recipient,
1024                respond,
1025            } => {
1026                let message = Message {
1027                    sender: self.public_key.clone(),
1028                    message_type: MessageType::External(payload),
1029                };
1030                let result = self
1031                    .network
1032                    .unicast(view, &recipient, &message)
1033                    .map_err(|err| {
1034                        CoordinatorError::from(err)
1035                            .context("send external message")
1036                            .into()
1037                    });
1038                let _ = respond.send(result);
1039            },
1040            ClientRequest::SeedPreCutover { seed, respond } => {
1041                tracing::info!(
1042                    undecided = seed.undecided.len(),
1043                    anchor_view = *seed.decided_anchor.view_number(),
1044                    high_qc_view = seed.high_qc.as_ref().map(|qc| *qc.view_number()),
1045                    cutover_view = *seed.cutover_view,
1046                    states = seed.validated_states.len(),
1047                    "coordinator: applying legacy → new-protocol seed",
1048                );
1049
1050                // State manager is owned by the coordinator, so the
1051                // validated-state map must be applied here before the
1052                // seed is consumed by consensus.
1053                let anchor_view = seed.decided_anchor.view_number();
1054                if let Some(state) = seed.validated_states.get(&anchor_view).cloned() {
1055                    self.state_manager
1056                        .seed_state(anchor_view, state, seed.decided_anchor.clone());
1057                }
1058                for leaf in &seed.undecided {
1059                    let view = leaf.view_number();
1060                    if let Some(state) = seed.validated_states.get(&view).cloned() {
1061                        self.state_manager.seed_state(view, state, leaf.clone());
1062                    }
1063                }
1064
1065                let highest_seeded_leaf = seed.undecided.last().unwrap_or(&seed.decided_anchor);
1066                let cutover_epoch = EpochNumber::new(epoch_from_block_number(
1067                    highest_seeded_leaf.block_header().block_number(),
1068                    *self.consensus.epoch_height,
1069                ));
1070                let cutover_view = seed.cutover_view;
1071
1072                self.consensus.apply_pre_cutover_seed(seed);
1073
1074                // Refresh peers for the cutover epoch before kicking the
1075                // leader — the proposal-driven site can't fire yet.
1076                if let Err(err) = self
1077                    .network
1078                    .apply_epoch(cutover_epoch, &self.membership_coordinator)
1079                {
1080                    tracing::error!(
1081                        %cutover_epoch,
1082                        %err,
1083                        "network on_epoch_change failed during seed_pre_cutover",
1084                    );
1085                }
1086
1087                let cur_view = self.consensus.current_view();
1088                if self.consensus.timeout_cert_at(cur_view).is_some() {
1089                    self.resume_after_cutover_tc();
1090                } else if cur_view + 1 == cutover_view
1091                    && self.consensus.cert1_at(cur_view).is_some()
1092                    && self.consensus.proposal_at(cur_view).is_some()
1093                {
1094                    self.start();
1095                } else {
1096                    let epoch = self
1097                        .consensus
1098                        .current_epoch()
1099                        .unwrap_or(EpochNumber::genesis());
1100                    self.outbox
1101                        .push_back(ConsensusOutput::ViewChanged(cur_view, epoch));
1102                }
1103                while let Some(output) = self.outbox.pop_front() {
1104                    if let Err(err) = self.process_consensus_output(output) {
1105                        tracing::warn!(
1106                            %err,
1107                            "error processing post-seed bootstrap output"
1108                        );
1109                    }
1110                }
1111                let _ = respond.send(());
1112            },
1113            ClientRequest::SubmitTimeoutVote { vote, respond } => {
1114                self.timeout_collector.accumulate_vote(vote.clone()).await;
1115                self.timeout_one_honest_collector
1116                    .accumulate_vote(vote.clone())
1117                    .await;
1118                // Rebroadcast so peer coordinators can aggregate too.
1119                let message = Message {
1120                    sender: self.public_key.clone(),
1121                    message_type: MessageType::Consensus(ConsensusMessage::TimeoutVote(
1122                        message::TimeoutVoteMessage { vote, lock: None },
1123                    )),
1124                };
1125                if let Err(err) = self.network.broadcast(message.view_number(), &message) {
1126                    tracing::warn!(%err, "failed to rebroadcast bridged timeout vote");
1127                }
1128                let _ = respond.send(());
1129            },
1130            ClientRequest::BumpNetworkEpoch { epoch, respond } => {
1131                if let Err(err) = self
1132                    .network
1133                    .apply_epoch(epoch, &self.membership_coordinator)
1134                {
1135                    tracing::warn!(%epoch, %err, "network on_epoch_change failed");
1136                }
1137                let _ = respond.send(());
1138            },
1139        }
1140
1141        Ok(())
1142    }
1143
1144    fn gc(&mut self, view: ViewNumber, epoch: EpochNumber) {
1145        info!(node = %self.node_id, %view, "garbage collecting");
1146        self.consensus.gc(view, epoch);
1147        self.checkpoint_collector.gc(view, epoch);
1148        let _ = self.network.gc(view); // TODO
1149        self.state_manager.gc(view);
1150        self.vid_disperser.gc(view);
1151        self.vid_reconstructor.gc(view);
1152        self.vote1_collector.gc(view, epoch);
1153        self.vote2_collector.gc(view, epoch);
1154        self.timeout_collector.gc(view, epoch);
1155        self.timeout_one_honest_collector.gc(view, epoch);
1156        self.epoch_root_collector.gc(view, epoch);
1157        self.epoch_manager.gc(epoch);
1158        self.block_builder.gc(view);
1159        self.pending_proposal_fetches.gc(view);
1160        self.storage.gc(view);
1161        self.cached_validated_proposals = self.cached_validated_proposals.split_off(&view);
1162        self.cached_vid_shares = self.cached_vid_shares.split_off(&view);
1163    }
1164}
1165
1166fn check_payload_commitment<T: NodeType>(
1167    proposal: &SignedProposal<T, Proposal<T>>,
1168    vid_share: &VidDisperseShare2<T>,
1169) -> bool {
1170    let VidCommitment::V2(commit) = proposal.data.block_header.payload_commitment() else {
1171        warn!(
1172            "unexpected payload commitment type in view {}, proposal discarded",
1173            proposal.data.view_number
1174        );
1175        return false;
1176    };
1177    if commit != vid_share.payload_commitment {
1178        warn!(
1179            "payload commitment mismatch in view {}, discard the proposal",
1180            proposal.data.view_number
1181        );
1182        return false;
1183    }
1184    true
1185}
1186
1187type ProposalFetchResponseSender<T> =
1188    oneshot::Sender<Result<SignedProposal<T, Proposal<T>>, QueryError>>;
1189
1190#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
1191struct ProposalFetchKey<T: NodeType> {
1192    view: ViewNumber,
1193    leaf_commitment: Commitment<Leaf2<T>>,
1194}
1195
1196impl<T: NodeType> ProposalFetchKey<T> {
1197    fn new(view: ViewNumber, leaf_commitment: Commitment<Leaf2<T>>) -> Self {
1198        Self {
1199            view,
1200            leaf_commitment,
1201        }
1202    }
1203}
1204
1205#[derive(Default)]
1206struct PendingProposalFetches<T: NodeType> {
1207    pending: HashMap<ProposalFetchKey<T>, Vec<ProposalFetchResponseSender<T>>>,
1208}
1209
1210impl<T: NodeType> PendingProposalFetches<T> {
1211    fn prune_closed(&mut self) {
1212        self.pending.retain(|_, responders| {
1213            responders.retain(|respond| !respond.is_closed());
1214            !responders.is_empty()
1215        });
1216    }
1217
1218    fn contains_request(
1219        &mut self,
1220        view: ViewNumber,
1221        leaf_commitment: Commitment<Leaf2<T>>,
1222    ) -> bool {
1223        self.prune_closed();
1224        self.pending
1225            .contains_key(&ProposalFetchKey::new(view, leaf_commitment))
1226    }
1227
1228    fn push(
1229        &mut self,
1230        view: ViewNumber,
1231        leaf_commitment: Commitment<Leaf2<T>>,
1232        respond: ProposalFetchResponseSender<T>,
1233    ) {
1234        self.pending
1235            .entry(ProposalFetchKey::new(view, leaf_commitment))
1236            .or_default()
1237            .push(respond);
1238    }
1239
1240    #[allow(dead_code)]
1241    fn gc(&mut self, view: ViewNumber) {
1242        self.pending.retain(|key, responders| {
1243            responders.retain(|respond| !respond.is_closed());
1244            key.view >= view && !responders.is_empty()
1245        });
1246    }
1247
1248    fn resolve(&mut self, proposal: &SignedProposal<T, Proposal<T>>) {
1249        self.prune_closed();
1250        let view = proposal.data.view_number;
1251        let leaf_commitment = proposal_commitment(&proposal.data);
1252        let key = ProposalFetchKey::new(view, leaf_commitment);
1253
1254        if let Some(responders) = self.pending.remove(&key) {
1255            for respond in responders {
1256                let _ = respond.send(Ok(proposal.clone()));
1257            }
1258        }
1259    }
1260}