1pub mod error;
2pub mod timer;
3
4use std::{
5 collections::{BTreeMap, HashMap},
6 sync::Arc,
7 time::Duration,
8};
9
10use bon::{Builder, bon};
11use committable::Commitment;
12use hotshot::{HotShotInitializer, traits::BlockPayload, types::SignatureKey};
13use hotshot_types::{
14 data::{EpochNumber, Leaf2, VidCommitment, VidDisperseShare2, ViewNumber},
15 epoch_membership::EpochMembershipCoordinator,
16 message::{Proposal as SignedProposal, UpgradeLock},
17 simple_certificate::{QuorumCertificate2, TimeoutCertificate2},
18 simple_vote::{HasEpoch, QuorumVote2, TimeoutVote2},
19 traits::{
20 block_contents::BlockHeader, node_implementation::NodeType,
21 signature_key::StateSignatureKey,
22 },
23 utils::{epoch_from_block_number, is_epoch_root},
24 vote::HasViewNumber,
25};
26use tokio::{select, sync::oneshot};
27use tracing::{error, info, warn};
28
29use crate::{
30 block::{BlockAndHeaderRequest, BlockBuilder, BlockBuilderConfig},
31 client::{ClientApi, ClientRequest, CoordinatorClient, QueryError},
32 consensus::{Consensus, ConsensusInput, ConsensusOutput},
33 coordinator::{
34 error::{CoordinatorError, ErrorSource, Severity},
35 timer::Timer,
36 },
37 epoch::{EpochManager, EpochRootResult},
38 epoch_root_vote_collector::EpochRootVoteCollector,
39 helpers::proposal_commitment,
40 logging::KeyPrefix,
41 message::{
42 self, BlockMessage, Certificate2, CheckpointCertificate, CheckpointVote, ConsensusMessage,
43 Message, MessageType, Proposal, ProposalFetchMessage, ProposalMessage, TimeoutOneHonest,
44 TransactionMessage, Unchecked, Vote2,
45 },
46 network::Network,
47 outbox::Outbox,
48 proposal::{ProposalValidator, ValidatedProposal, VidShareValidator},
49 state::{HeaderRequest, StateEntry, StateManager, StateManagerOutput},
50 storage::{NewProtocolStorage, Storage},
51 vid::{VidDisperseRequest, VidDisperser, VidReconstructor},
52 vote::VoteCollector,
53};
54
55#[allow(clippy::large_enum_variant)]
56pub enum CoordinatorOutput<T: NodeType> {
57 Consensus(ConsensusOutput<T>),
58 ExternalMessageReceived {
59 sender: T::SignatureKey,
60 data: Vec<u8>,
61 },
62}
63
64#[derive(Builder)]
65pub struct Coordinator<T: NodeType, N, S> {
66 membership_coordinator: EpochMembershipCoordinator<T>,
67 consensus: Consensus<T>,
68 network: N,
69 state_manager: StateManager<T>,
70 #[builder(default)]
71 client: CoordinatorClient<T>,
72 vid_disperser: VidDisperser<T>,
73 vid_reconstructor: VidReconstructor<T>,
74 vote1_collector: VoteCollector<T, QuorumVote2<T>, QuorumCertificate2<T>>,
75 vote2_collector: VoteCollector<T, Vote2<T>, Certificate2<T>>,
76 timeout_collector: VoteCollector<T, TimeoutVote2<T>, TimeoutCertificate2<T>>,
77 timeout_one_honest_collector: VoteCollector<T, TimeoutVote2<T>, TimeoutOneHonest<T>>,
78 checkpoint_collector: VoteCollector<T, CheckpointVote<T>, CheckpointCertificate<T>>,
79 epoch_root_collector: EpochRootVoteCollector<T>,
80 epoch_manager: EpochManager<T>,
81 block_builder: BlockBuilder<T>,
82 proposal_validator: ProposalValidator<T>,
83 share_validator: VidShareValidator<T>,
84 storage: Storage<T, S>,
85 #[builder(default)]
86 outbox: Outbox<ConsensusOutput<T>>,
87 #[builder(default)]
88 coordinator_outbox: Outbox<CoordinatorOutput<T>>,
89 public_key: T::SignatureKey,
90 #[builder(default = KeyPrefix::from(&public_key))]
91 node_id: KeyPrefix,
92 timer: Timer,
93 #[builder(skip)]
94 pending_proposal_fetches: PendingProposalFetches<T>,
95 #[builder(default)]
96 cached_validated_proposals: BTreeMap<ViewNumber, ValidatedProposal<T>>,
97 #[builder(default)]
98 cached_vid_shares: BTreeMap<ViewNumber, VidDisperseShare2<T>>,
99}
100
101#[bon]
102impl<T, N, S> Coordinator<T, N, S>
103where
104 T: NodeType,
105 N: Network<T>,
106 S: NewProtocolStorage<T>,
107{
108 #[builder(builder_type = CoordinatorMaker, finish_fn = make)]
109 #[allow(clippy::too_many_arguments)]
110 pub fn maker(
111 membership_coordinator: EpochMembershipCoordinator<T>,
112 network: N,
113 initializer: &HotShotInitializer<T>,
114 upgrade_lock: UpgradeLock<T>,
115 public_key: T::SignatureKey,
116 private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
117 state_private_key: <T::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
118 stake_table_capacity: usize,
119 timeout_duration: Duration,
120 storage: S,
121 garbage_collection_interval: u64,
122 ) -> Self {
123 let mut consensus = Consensus::new(
124 membership_coordinator.clone(),
125 public_key.clone(),
126 private_key.clone(),
127 state_private_key,
128 stake_table_capacity,
129 upgrade_lock.clone(),
130 initializer.anchor_leaf.clone(),
131 initializer.epoch_height,
132 garbage_collection_interval,
133 );
134
135 let genesis_cert1 = initializer.high_qc.clone();
136 let genesis_proposal = message::Proposal {
137 block_header: initializer.anchor_leaf.block_header().clone(),
138 view_number: ViewNumber::genesis(),
139 epoch: EpochNumber::genesis(),
140 justify_qc: genesis_cert1.clone(),
141 next_epoch_justify_qc: None,
142 upgrade_certificate: None,
143 view_change_evidence: None,
144 next_drb_result: None,
145 state_cert: None,
146 };
147 let mut state_manager = StateManager::new(
148 Arc::new(initializer.instance_state.clone()),
149 upgrade_lock.clone(),
150 );
151 state_manager.seed_state(
152 initializer.anchor_leaf.view_number(),
153 initializer.anchor_state.clone(),
154 initializer.anchor_leaf.clone(),
155 );
156 state_manager.seed_state(
162 ViewNumber::genesis(),
163 initializer.anchor_state.clone(),
164 Leaf2::from(genesis_proposal.clone()),
165 );
166 consensus.seed_genesis(genesis_cert1, genesis_proposal);
167
168 let lock = upgrade_lock.clone();
169 Self::builder()
170 .consensus(consensus)
171 .network(network)
172 .state_manager(state_manager)
173 .vid_disperser(VidDisperser::new(membership_coordinator.clone()))
174 .vid_reconstructor(VidReconstructor::new())
175 .vote1_collector(VoteCollector::new(
176 membership_coordinator.clone(),
177 lock.clone(),
178 ))
179 .vote2_collector(VoteCollector::new(
180 membership_coordinator.clone(),
181 lock.clone(),
182 ))
183 .timeout_collector(VoteCollector::new(
184 membership_coordinator.clone(),
185 lock.clone(),
186 ))
187 .timeout_one_honest_collector(VoteCollector::new(
188 membership_coordinator.clone(),
189 lock.clone(),
190 ))
191 .checkpoint_collector(VoteCollector::new(
192 membership_coordinator.clone(),
193 lock.clone(),
194 ))
195 .epoch_root_collector(EpochRootVoteCollector::new(
196 membership_coordinator.clone(),
197 lock,
198 ))
199 .epoch_manager(EpochManager::new(
200 initializer.epoch_height,
201 membership_coordinator.clone(),
202 ))
203 .block_builder(BlockBuilder::new(
204 Arc::new(initializer.instance_state.clone()),
205 membership_coordinator.clone(),
206 BlockBuilderConfig::default(),
207 upgrade_lock.clone(),
208 ))
209 .proposal_validator(ProposalValidator::new(
210 membership_coordinator.clone(),
211 initializer.epoch_height,
212 upgrade_lock.clone(),
213 ))
214 .share_validator(VidShareValidator::new(
215 membership_coordinator.clone(),
216 initializer.epoch_height,
217 upgrade_lock,
218 ))
219 .storage(Storage::new(storage, private_key))
220 .membership_coordinator(membership_coordinator)
221 .timer(Timer::new(
222 timeout_duration,
223 ViewNumber::genesis(),
224 EpochNumber::genesis(),
225 ))
226 .public_key(public_key)
227 .build()
228 }
229
230 pub fn start(&mut self) {
233 let cur_view = self.consensus.current_view();
234 let next_view = cur_view + 1;
235 let epoch = self
236 .consensus
237 .current_epoch()
238 .unwrap_or(EpochNumber::genesis());
239
240 if self.consensus.last_decided_leaf().view_number() == ViewNumber::genesis() {
241 let genesis_leaf = self.consensus.last_decided_leaf().clone();
243 let (payload, metadata) = T::BlockPayload::empty();
244 self.storage.append_da(
245 ViewNumber::genesis(),
246 EpochNumber::genesis(),
247 payload,
248 metadata,
249 genesis_leaf.payload_commitment(),
250 );
251
252 self.outbox.push_back(ConsensusOutput::LeafDecided {
254 leaves: vec![genesis_leaf],
255 cert1: self
256 .consensus
257 .cert1_at(ViewNumber::genesis())
258 .cloned()
259 .expect("genesis cert1 must be seeded"),
260 cert2: None,
261 vid_shares: vec![None],
262 });
263 }
264
265 self.outbox
266 .push_back(ConsensusOutput::ViewChanged(next_view, epoch));
267
268 if let Some(leader) = self.leader(next_view, epoch)
269 && leader == self.public_key
270 {
271 let parent_proposal = self
272 .consensus
273 .proposal_at(cur_view)
274 .expect("parent proposal must be seeded before start()")
275 .clone();
276 self.outbox
277 .push_back(ConsensusOutput::RequestBlockAndHeader(
278 BlockAndHeaderRequest {
279 view: next_view,
280 epoch,
281 parent_proposal,
282 },
283 ));
284 }
285 }
286
287 fn resume_after_cutover_tc(&mut self) {
291 let cur_view = self.consensus.current_view();
292 if self.consensus.timeout_cert_at(cur_view).is_none() {
293 return;
294 }
295 let epoch = self
296 .consensus
297 .current_epoch()
298 .unwrap_or(EpochNumber::genesis());
299 let Some(leader) = self.leader(cur_view, epoch) else {
300 return;
301 };
302 if leader != self.public_key {
303 return;
304 }
305 let Some(locked_view) = self.consensus.locked_view() else {
306 return;
307 };
308 let Some(parent_proposal) = self.consensus.proposal_at(locked_view).cloned() else {
309 return;
310 };
311 self.outbox
312 .push_back(ConsensusOutput::RequestBlockAndHeader(
313 BlockAndHeaderRequest {
314 view: cur_view,
315 epoch,
316 parent_proposal,
317 },
318 ));
319 }
320
321 pub async fn stop(mut self) {
322 self.network.shutdown().await
323 }
324
325 pub async fn next_consensus_input(&mut self) -> Result<ConsensusInput<T>, CoordinatorError> {
326 loop {
327 select! {
328 message = self.network.receive() => match message {
329 Ok(m) => {
330 if let Some(input) = self.on_network_message(m).await {
331 return Ok(input)
332 }
333 }
334 Err(e) => {
335 return Err(CoordinatorError::from(e).context("network receive"))
336 }
337 },
338 () = &mut self.timer => {
339 let input = ConsensusInput::Timeout(self.timer.view(), self.timer.epoch());
340 return Ok(input)
341 }
342 Some(output) = self.state_manager.next() => {
343 if let Some(input) = self.on_state_manager_output(output) {
344 return Ok(input)
345 }
346 }
347 Some(request) = self.client.next_request() => {
348 if let Err(err) = self.on_client_request(request).await {
349 error!(%err, "error while handling client request");
350 }
351 }
352 Some(tcert) = self.timeout_collector.next() => {
353 return Ok(ConsensusInput::TimeoutCertificate(tcert))
354 }
355 Some(out) = self.timeout_one_honest_collector.next() => {
356 let Some(epoch) = out.data.epoch else {
357 let msg = format!("missing epoch in view {}", out.view_number());
358 return Err(CoordinatorError::regular(msg).context("gc timeout one honest"))
359 };
360 return Ok(ConsensusInput::TimeoutOneHonest(out.view_number(), epoch))
361 }
362 Some(cert1) = self.vote1_collector.next() => {
363 return Ok(ConsensusInput::Certificate1(cert1))
364 }
365 Some(cert2) = self.vote2_collector.next() => {
366 return Ok(ConsensusInput::Certificate2(cert2))
367 }
368 Some((cert1, state_cert)) = self.epoch_root_collector.next() => {
369 self.storage.append_state_cert(
370 ViewNumber::new(state_cert.light_client_state.view_number),
371 state_cert.clone(),
372 );
373 return Ok(ConsensusInput::EpochRootCertificates { cert1, state_cert })
374 }
375 Some(item) = self.share_validator.next() => match item {
376 Ok(vid_share) => {
377 let view = vid_share.view_number();
378 let Some(validated) = self.cached_validated_proposals.remove(&view) else {
379 self.cached_vid_shares.insert(view, vid_share);
381 continue;
382 };
383 if !check_payload_commitment(&validated.message.proposal, &vid_share) {
384 continue;
385 }
386 return self.on_proposal_and_vid_share(validated, vid_share)
387 },
388 Err(e) => {
389 return Err(CoordinatorError::regular(e).context("vid share validation"))
390 }
391 },
392 Some(item) = self.proposal_validator.next() => match item {
393 Ok(validated) => {
394 let epoch = validated.message.proposal.data.epoch;
396 if let Err(err) = self
397 .network
398 .apply_epoch(epoch, &self.membership_coordinator)
399 {
400 error!(%epoch, %err, "network apply_epoch failed");
401 }
402
403 let view = validated.message.proposal.data.view_number();
404 let Some(vid_share) = self.cached_vid_shares.remove(&view) else {
405 self.cached_validated_proposals.insert(view, validated);
407 continue;
408 };
409 if !check_payload_commitment(&validated.message.proposal, &vid_share) {
411 continue;
412 }
413 return self.on_proposal_and_vid_share(validated, vid_share)
414 }
415 Err(e) => {
416 return Err(CoordinatorError::regular(e).context("proposal validation"))
417 }
418 },
419 Some(cert) = self.checkpoint_collector.next() => {
420 let Some(epoch) = cert.epoch() else {
421 let msg = format!("missing epoch in view {}", cert.view_number());
422 return Err(CoordinatorError::critical(msg).context("gc certificate"))
423 };
424 self.gc(cert.view_number(), epoch);
425 }
426 Some(item) = self.block_builder.next() => match item {
427 Ok(block) => {
428 self.state_manager.request_header(HeaderRequest::from(&block));
429 let next_view = block.view + 1;
430 let epoch = block.epoch;
431 let manifest = block.manifest.clone();
432 self.storage.append_da(
433 block.view,
434 block.epoch,
435 block.payload.payload.clone(),
436 block.payload.metadata.clone(),
437 block.payload_commitment,
438 );
439 self.vid_reconstructor.mark_reconstructed(block.view);
441 self.unicast_to_leader(
442 next_view,
443 epoch,
444 BlockMessage::DedupManifest(manifest),
445 )?;
446 return Ok(block.into())
447 }
448 Err(err) => {
449 return Err(CoordinatorError::regular(err).context("block building"))
450 }
451 },
452 Some(item) = self.vid_disperser.next() => match item {
453 Ok(out) => {
454 return Ok(ConsensusInput::VidDisperseCreated(out.view, out.disperse))
455 }
456 Err(()) => {
457 return Err(CoordinatorError::unspecified().context("vid disperse"))
458 }
459 },
460 Some(item) = self.vid_reconstructor.next() => match item {
461 Ok(out) => {
462 self.block_builder.on_block_reconstructed(out.tx_commitments);
463 self.storage.append_da(
464 out.view,
465 out.epoch,
466 out.payload.clone(),
467 out.metadata.clone(),
468 VidCommitment::V2(out.payload_commitment),
469 );
470 if let Some(proposal) = self.consensus.proposal_at(out.view) {
471 self.outbox.push_back(ConsensusOutput::BlockPayloadReconstructed {
472 view: out.view,
473 header: proposal.block_header.clone(),
474 payload: out.payload,
475 });
476 }
477 return Ok(ConsensusInput::BlockReconstructed(out.view, out.payload_commitment))
478 }
479 Err(()) => {
480 return Err(CoordinatorError::unspecified().context("vid reconstruction"))
481 }
482 },
483 Some(result) = self.epoch_manager.next() => match result {
484 Ok(EpochRootResult::DrbResult(epoch, drb_result)) => {
485 self.vote1_collector.retry_pending_votes().await;
488 self.vote2_collector.retry_pending_votes().await;
489 self.timeout_collector.retry_pending_votes().await;
490 self.timeout_one_honest_collector.retry_pending_votes().await;
491 return Ok(ConsensusInput::DrbResult(epoch, drb_result))
492 }
493 Err(failure) => {
494 warn!(%failure.error, epoch = %failure.epoch, "DRB request failed");
499 continue;
500 }
501 },
502 else => {
503 return Err(CoordinatorError::critical(ErrorSource::NoInput))
504 }
505 }
506 }
507 }
508
509 pub fn apply_consensus(&mut self, input: ConsensusInput<T>) {
510 self.consensus.apply(input, &mut self.outbox)
511 }
512
513 pub fn process_consensus_output(
514 &mut self,
515 output: ConsensusOutput<T>,
516 ) -> Result<(), CoordinatorError> {
517 match output {
518 ConsensusOutput::RequestState(state_request) => {
519 self.state_manager.request_state(state_request);
520 },
521 ConsensusOutput::RequestVidDisperse {
522 view,
523 epoch,
524 payload,
525 metadata,
526 } => {
527 self.vid_disperser.request_vid_disperse(VidDisperseRequest {
528 view,
529 epoch,
530 block: payload,
531 metadata,
532 });
533 },
534 ConsensusOutput::RequestDrbResult(epoch) => {
535 self.epoch_manager.request_drb_result(epoch);
536 },
537 ConsensusOutput::SendCheckpointVote(checkpoint_vote) => {
538 let message = Message {
539 sender: self.public_key.clone(),
540 message_type: MessageType::Consensus(ConsensusMessage::Checkpoint(
541 checkpoint_vote,
542 )),
543 };
544 self.network
545 .broadcast(message.view_number(), &message)
546 .map_err(|e| CoordinatorError::from(e).context("broadcast checkpoint vote"))?
547 },
548 ConsensusOutput::LeafDecided { leaves, cert2, .. } => {
549 if let Some(cert2) = cert2 {
550 self.storage.append_cert2(cert2.view_number, cert2.clone());
551 }
552 for leaf in leaves {
553 self.epoch_manager.handle_leaf_decided(leaf);
554 }
555 },
556 ConsensusOutput::LockUpdated(_) => {}, ConsensusOutput::RequestBlockAndHeader(request) => {
558 self.block_builder.request_block(request);
559 },
560 ConsensusOutput::SendProposal(proposal) => {
561 self.storage.append_proposal(proposal.data.clone());
562 let message = Message {
566 sender: self.public_key.clone(),
567 message_type: MessageType::Consensus(ConsensusMessage::Proposal(
568 ProposalMessage::validated(proposal.clone()),
569 )),
570 };
571 if let Err(err) = self.network.broadcast(message.view_number(), &message) {
572 let err = CoordinatorError::from(err).context("proposal broadcast");
573 if err.severity == Severity::Critical {
574 return Err(err);
575 } else {
576 warn!(%err, "network error while broadcasting proposal")
577 }
578 }
579 },
580 ConsensusOutput::SendVidShares(vid_shares) => {
581 for share in vid_shares {
582 let recipient = share.data.recipient_key.clone();
583 let message = Message {
584 sender: self.public_key.clone(),
585 message_type: MessageType::Consensus(ConsensusMessage::VidShare(share)),
586 };
587 if let Err(err) =
588 self.network
589 .unicast(message.view_number(), &recipient, &message)
590 {
591 let err = CoordinatorError::from(err).context("vid share unicast");
592 if err.severity == Severity::Critical {
593 return Err(err);
594 } else {
595 warn!(%err, "network error while sending vid share")
596 }
597 }
598 }
599 },
600 ConsensusOutput::SendTimeoutVote(vote, lock) => {
601 let message = Message {
602 sender: self.public_key.clone(),
603 message_type: MessageType::Consensus(ConsensusMessage::TimeoutVote(
604 message::TimeoutVoteMessage { vote, lock },
605 )),
606 };
607 self.network
608 .broadcast(message.view_number(), &message)
609 .map_err(|e| CoordinatorError::from(e).context("broadcast timeout vote"))?
610 },
611 ConsensusOutput::SendTimeoutCertificate(tc, view, epoch) => {
612 if let Some(leader) = self.leader(view, epoch) {
613 let message = Message {
614 sender: self.public_key.clone(),
615 message_type: MessageType::Consensus(ConsensusMessage::TimeoutCertificate(
616 tc,
617 )),
618 };
619 self.network
620 .unicast(message.view_number(), &leader, &message)
621 .map_err(|e| CoordinatorError::from(e).context("timeout certificate"))?;
622 }
623 },
624 ConsensusOutput::SendVote1(vote1) => {
625 let message = Message {
626 sender: self.public_key.clone(),
627 message_type: MessageType::Consensus(ConsensusMessage::Vote1(vote1)),
628 };
629 self.network
630 .broadcast(message.view_number(), &message)
631 .map_err(|e| CoordinatorError::from(e).context("broadcast vote1"))?
632 },
633 ConsensusOutput::SendVote2(vote2) => {
634 let message = Message {
635 sender: self.public_key.clone(),
636 message_type: MessageType::Consensus(ConsensusMessage::Vote2(vote2)),
637 };
638 self.network
639 .broadcast(message.view_number(), &message)
640 .map_err(|e| CoordinatorError::from(e).context("broadcast vote2"))?
641 },
642 ConsensusOutput::SendEpochChange(epoch_change) => {
643 let message = Message {
644 sender: self.public_key.clone(),
645 message_type: MessageType::Consensus(ConsensusMessage::EpochChange(
646 epoch_change,
647 )),
648 };
649 self.network
650 .broadcast(message.view_number(), &message)
651 .map_err(|e| CoordinatorError::from(e).context("broadcast epoch change"))?
652 },
653 ConsensusOutput::SendCertificate1(cert1) => {
654 let message = Message {
655 sender: self.public_key.clone(),
656 message_type: MessageType::Consensus(ConsensusMessage::Certificate1(
657 cert1,
658 self.public_key.clone(),
659 )),
660 };
661 self.network
662 .broadcast(message.view_number(), &message)
663 .map_err(|e| CoordinatorError::from(e).context("broadcast certificate1"))?
664 },
665 ConsensusOutput::ProposalValidated { .. } => {},
666 ConsensusOutput::ViewChanged(view, epoch) => {
667 self.consensus.set_view(view, epoch);
668 self.timer.reset_with_epoch(view, epoch);
669 let txns = self.block_builder.on_view_changed(view, epoch);
670 if !txns.is_empty() {
671 let next_view = view + 1;
672 self.unicast_to_leader(
673 next_view,
674 epoch,
675 BlockMessage::Transactions(TransactionMessage {
676 view: next_view,
677 transactions: txns,
678 }),
679 )
680 .map_err(|e| e.context("unicast transactions"))?;
681 }
682
683 let next_epoch = epoch + 1;
686 if next_epoch > EpochNumber::genesis() + 1 {
687 self.epoch_manager.request_drb_result(next_epoch);
688 }
689 },
690 ConsensusOutput::BlockPayloadReconstructed { .. } => {},
691 }
692 Ok(())
693 }
694
695 pub fn node_id(&self) -> &KeyPrefix {
696 &self.node_id
697 }
698
699 pub fn outbox(&self) -> &Outbox<ConsensusOutput<T>> {
700 &self.outbox
701 }
702
703 pub fn outbox_mut(&mut self) -> &mut Outbox<ConsensusOutput<T>> {
704 &mut self.outbox
705 }
706
707 pub fn coordinator_outbox(&self) -> &Outbox<CoordinatorOutput<T>> {
708 &self.coordinator_outbox
709 }
710
711 pub fn coordinator_outbox_mut(&mut self) -> &mut Outbox<CoordinatorOutput<T>> {
712 &mut self.coordinator_outbox
713 }
714
715 pub fn current_view(&self) -> ViewNumber {
716 self.consensus.current_view()
717 }
718
719 pub fn state(&self, v: ViewNumber) -> Option<&StateEntry<T>> {
720 self.state_manager.get_state(v)
721 }
722
723 pub fn client_api(&self) -> &ClientApi<T> {
724 self.client.handle()
725 }
726
727 pub(crate) async fn on_network_message(
728 &mut self,
729 message: Message<T, Unchecked>,
730 ) -> Option<ConsensusInput<T>> {
731 match message.message_type {
732 MessageType::Consensus(msg) => match msg {
733 ConsensusMessage::Proposal(p) => {
734 if self.consensus.wants_proposal_for_view(&p.view_number()) {
735 self.proposal_validator.validate(p);
736 }
737 None
738 },
739 ConsensusMessage::VidShare(share) => {
740 if self
741 .consensus
742 .wants_proposal_for_view(&share.data.view_number())
743 {
744 self.share_validator.validate(share);
745 }
746 None
747 },
748 ConsensusMessage::Vote1(vote1) => {
749 let bn = vote1.vote.data.block_number.unwrap_or(0);
750 let epoch_height = *self.consensus.epoch_height;
751 if is_epoch_root(bn, epoch_height) {
752 vote1.state_vote.as_ref()?;
755 self.epoch_root_collector.accumulate(vote1.clone()).await;
756 } else {
757 self.vote1_collector
758 .accumulate_vote(vote1.vote.clone())
759 .await;
760 }
761 self.vid_reconstructor
762 .handle_vid_share(vote1.vid_share, None);
763 None
764 },
765 ConsensusMessage::Vote2(vote2) => {
766 self.vote2_collector.accumulate_vote(vote2).await;
767 None
768 },
769 ConsensusMessage::Certificate1(certificate1, _key) => {
770 Some(ConsensusInput::Certificate1(certificate1))
771 },
772 ConsensusMessage::Certificate2(certificate2, _key) => {
773 Some(ConsensusInput::Certificate2(certificate2))
774 },
775 ConsensusMessage::TimeoutVote(timeout_msg) => {
776 self.timeout_collector
777 .accumulate_vote(timeout_msg.vote.clone())
778 .await;
779 self.timeout_one_honest_collector
780 .accumulate_vote(timeout_msg.vote)
781 .await;
782 None
783 },
784 ConsensusMessage::TimeoutCertificate(tc) => {
785 Some(ConsensusInput::TimeoutCertificate(tc))
786 },
787 ConsensusMessage::EpochChange(epoch_change) => {
788 Some(ConsensusInput::EpochChange(epoch_change))
789 },
790 ConsensusMessage::Checkpoint(checkpoint) => {
791 self.checkpoint_collector.accumulate_vote(checkpoint).await;
792 None
793 },
794 },
795 MessageType::Block(msg) => {
796 match msg {
797 BlockMessage::Transactions(msg) => self.block_builder.on_transactions(msg),
798 BlockMessage::DedupManifest(manifest) => {
799 if let Some(view_leader) = self.leader(manifest.view, manifest.epoch)
800 && view_leader == message.sender
801 {
802 self.block_builder.on_dedup_manifest(manifest)
803 }
804 },
805 }
806 None
807 },
808 MessageType::ProposalFetch(ProposalFetchMessage::Request(request)) => {
809 if !request.validate_sender(&message.sender) {
810 warn!(
811 sender = %message.sender,
812 view = %request.view_number(),
813 "ignoring invalid proposal fetch request signature"
814 );
815 return None;
816 }
817 if let Some(proposal) = self
818 .consensus
819 .signed_proposal(&request.view_number())
820 .cloned()
821 {
822 let response = Message {
823 sender: self.public_key.clone(),
824 message_type: MessageType::ProposalFetch(ProposalFetchMessage::Response(
825 Box::new(proposal),
826 )),
827 };
828
829 if let Err(err) =
830 self.network
831 .unicast(request.view_number(), &message.sender, &response)
832 {
833 let err = CoordinatorError::from(err).context("proposal response");
834 warn!(%err, "network error while sending proposal response");
835 }
836 }
837 None
838 },
839 MessageType::ProposalFetch(ProposalFetchMessage::Response(proposal)) => {
840 self.pending_proposal_fetches.resolve(&proposal);
841 None
842 },
843 MessageType::External(data) => {
844 self.coordinator_outbox
845 .push_back(CoordinatorOutput::ExternalMessageReceived {
846 sender: message.sender,
847 data,
848 });
849 None
850 },
851 }
852 }
853
854 fn on_state_manager_output(
855 &mut self,
856 output: StateManagerOutput<T>,
857 ) -> Option<ConsensusInput<T>> {
858 match output {
859 StateManagerOutput::State {
860 response,
861 validated: true,
862 } => Some(ConsensusInput::StateValidated(response)),
863 StateManagerOutput::State {
864 response,
865 validated: false,
866 } => Some(ConsensusInput::StateValidationFailed(response)),
867 StateManagerOutput::Header {
868 response,
869 header: Some(hdr),
870 } => Some(ConsensusInput::HeaderCreated(
871 response.view,
872 proposal_commitment(&response.parent_proposal),
873 hdr,
874 )),
875 StateManagerOutput::Header {
876 response,
877 header: None,
878 } => {
879 tracing::warn!(view = %response.view, "header creation failed");
880 None
881 },
882 }
883 }
884
885 fn on_proposal_and_vid_share(
886 &mut self,
887 validated: ValidatedProposal<T>,
888 vid_share: VidDisperseShare2<T>,
889 ) -> Result<ConsensusInput<T>, CoordinatorError> {
890 self.storage.append_vid(vid_share.clone());
891 self.storage
892 .append_proposal(validated.message.proposal.data.clone());
893
894 let m = validated
895 .message
896 .proposal
897 .data
898 .block_header
899 .metadata()
900 .clone();
901 self.vid_reconstructor
902 .handle_vid_share(vid_share.clone(), m);
903
904 let view = validated.message.proposal.data.view_number();
906 self.cached_vid_shares = self.cached_vid_shares.split_off(&(view + 1));
907 self.cached_validated_proposals = self.cached_validated_proposals.split_off(&(view + 1));
908
909 Ok(ConsensusInput::ProposalWithVidShare(
910 validated.sender,
911 validated.message,
912 vid_share,
913 ))
914 }
915
916 fn unicast_to_leader(
917 &mut self,
918 view: ViewNumber,
919 epoch: EpochNumber,
920 msg: BlockMessage<T>,
921 ) -> Result<(), CoordinatorError> {
922 let Some(leader) = self.leader(view, epoch) else {
923 warn!(%view, %epoch, "failed to resolve leader for unicast");
924 return Ok(());
925 };
926 let message = Message {
927 sender: self.public_key.clone(),
928 message_type: MessageType::Block(msg),
929 };
930 self.network
931 .unicast(message.view_number(), &leader, &message)
932 .map_err(|e| CoordinatorError::from(e).context("leader unicast"))
933 }
934
935 fn leader(&mut self, view: ViewNumber, epoch: EpochNumber) -> Option<T::SignatureKey> {
936 let membership = self
937 .membership_coordinator
938 .membership_for_epoch(Some(epoch))
939 .ok()?;
940 membership.leader(view).ok()
941 }
942
943 async fn on_client_request(
944 &mut self,
945 request: ClientRequest<T>,
946 ) -> Result<(), CoordinatorError> {
947 match request {
948 ClientRequest::CurrentView(tx) => {
949 let _ = tx.send(self.consensus.current_view());
950 },
951 ClientRequest::CurrentEpoch(tx) => {
952 let _ = tx.send(self.consensus.current_epoch());
953 },
954 ClientRequest::DecidedLeaf(tx) => {
955 let _ = tx.send(self.consensus.last_decided_leaf().clone());
956 },
957 ClientRequest::DecidedState(tx) => {
958 let view = self.consensus.last_decided_leaf().view_number();
959 let _ = tx.send(self.state(view).map(|s| s.state.clone()));
960 },
961 ClientRequest::UndecidedLeaves(tx) => {
962 let _ = tx.send(self.consensus.undecided_leaves().cloned().collect());
963 },
964 ClientRequest::GetState { view, respond } => {
965 let _ = respond.send(self.state(view).map(|s| s.state.clone()));
966 },
967 ClientRequest::GetStateAndDelta { view, respond } => {
968 let _ = respond.send(match self.state(view) {
969 Some(s) => (Some(s.state.clone()), s.delta.clone()),
970 None => (None, None),
971 });
972 },
973 ClientRequest::SubmitTransaction { tx, respond } => {
974 self.block_builder.on_submit_transaction(tx);
975 let _ = respond.send(());
976 },
977 ClientRequest::UpdateLeaf { update, respond } => {
978 self.state_manager.update_state(update);
979 let _ = respond.send(());
980 },
981 ClientRequest::RequestProposal {
982 view,
983 leaf_commitment,
984 respond,
985 } => {
986 if let Some(proposal) = self.consensus.signed_proposal(&view)
987 && proposal_commitment(&proposal.data) == leaf_commitment
988 {
989 let _ = respond.send(Ok(proposal.clone()));
990 return Ok(());
991 }
992 if !self
993 .pending_proposal_fetches
994 .contains_request(view, leaf_commitment)
995 {
996 let request =
997 self.consensus
998 .signed_proposal_fetch_request(view)
999 .map_err(|err| {
1000 let err = format!("failed to sign proposal request: {err}");
1001 CoordinatorError::regular(err).context("sign proposal request")
1002 })?;
1003
1004 let message = Message {
1005 sender: self.public_key.clone(),
1006 message_type: MessageType::ProposalFetch(ProposalFetchMessage::Request(
1007 request,
1008 )),
1009 };
1010
1011 self.network
1012 .broadcast(message.view_number(), &message)
1013 .map_err(|err| {
1014 CoordinatorError::from(err).context("broadcast proposal request")
1015 })?;
1016 }
1017 self.pending_proposal_fetches
1018 .push(view, leaf_commitment, respond);
1019 },
1020 ClientRequest::SendExternalMessage {
1021 view,
1022 payload,
1023 recipient,
1024 respond,
1025 } => {
1026 let message = Message {
1027 sender: self.public_key.clone(),
1028 message_type: MessageType::External(payload),
1029 };
1030 let result = self
1031 .network
1032 .unicast(view, &recipient, &message)
1033 .map_err(|err| {
1034 CoordinatorError::from(err)
1035 .context("send external message")
1036 .into()
1037 });
1038 let _ = respond.send(result);
1039 },
1040 ClientRequest::SeedPreCutover { seed, respond } => {
1041 tracing::info!(
1042 undecided = seed.undecided.len(),
1043 anchor_view = *seed.decided_anchor.view_number(),
1044 high_qc_view = seed.high_qc.as_ref().map(|qc| *qc.view_number()),
1045 cutover_view = *seed.cutover_view,
1046 states = seed.validated_states.len(),
1047 "coordinator: applying legacy → new-protocol seed",
1048 );
1049
1050 let anchor_view = seed.decided_anchor.view_number();
1054 if let Some(state) = seed.validated_states.get(&anchor_view).cloned() {
1055 self.state_manager
1056 .seed_state(anchor_view, state, seed.decided_anchor.clone());
1057 }
1058 for leaf in &seed.undecided {
1059 let view = leaf.view_number();
1060 if let Some(state) = seed.validated_states.get(&view).cloned() {
1061 self.state_manager.seed_state(view, state, leaf.clone());
1062 }
1063 }
1064
1065 let highest_seeded_leaf = seed.undecided.last().unwrap_or(&seed.decided_anchor);
1066 let cutover_epoch = EpochNumber::new(epoch_from_block_number(
1067 highest_seeded_leaf.block_header().block_number(),
1068 *self.consensus.epoch_height,
1069 ));
1070 let cutover_view = seed.cutover_view;
1071
1072 self.consensus.apply_pre_cutover_seed(seed);
1073
1074 if let Err(err) = self
1077 .network
1078 .apply_epoch(cutover_epoch, &self.membership_coordinator)
1079 {
1080 tracing::error!(
1081 %cutover_epoch,
1082 %err,
1083 "network on_epoch_change failed during seed_pre_cutover",
1084 );
1085 }
1086
1087 let cur_view = self.consensus.current_view();
1088 if self.consensus.timeout_cert_at(cur_view).is_some() {
1089 self.resume_after_cutover_tc();
1090 } else if cur_view + 1 == cutover_view
1091 && self.consensus.cert1_at(cur_view).is_some()
1092 && self.consensus.proposal_at(cur_view).is_some()
1093 {
1094 self.start();
1095 } else {
1096 let epoch = self
1097 .consensus
1098 .current_epoch()
1099 .unwrap_or(EpochNumber::genesis());
1100 self.outbox
1101 .push_back(ConsensusOutput::ViewChanged(cur_view, epoch));
1102 }
1103 while let Some(output) = self.outbox.pop_front() {
1104 if let Err(err) = self.process_consensus_output(output) {
1105 tracing::warn!(
1106 %err,
1107 "error processing post-seed bootstrap output"
1108 );
1109 }
1110 }
1111 let _ = respond.send(());
1112 },
1113 ClientRequest::SubmitTimeoutVote { vote, respond } => {
1114 self.timeout_collector.accumulate_vote(vote.clone()).await;
1115 self.timeout_one_honest_collector
1116 .accumulate_vote(vote.clone())
1117 .await;
1118 let message = Message {
1120 sender: self.public_key.clone(),
1121 message_type: MessageType::Consensus(ConsensusMessage::TimeoutVote(
1122 message::TimeoutVoteMessage { vote, lock: None },
1123 )),
1124 };
1125 if let Err(err) = self.network.broadcast(message.view_number(), &message) {
1126 tracing::warn!(%err, "failed to rebroadcast bridged timeout vote");
1127 }
1128 let _ = respond.send(());
1129 },
1130 ClientRequest::BumpNetworkEpoch { epoch, respond } => {
1131 if let Err(err) = self
1132 .network
1133 .apply_epoch(epoch, &self.membership_coordinator)
1134 {
1135 tracing::warn!(%epoch, %err, "network on_epoch_change failed");
1136 }
1137 let _ = respond.send(());
1138 },
1139 }
1140
1141 Ok(())
1142 }
1143
1144 fn gc(&mut self, view: ViewNumber, epoch: EpochNumber) {
1145 info!(node = %self.node_id, %view, "garbage collecting");
1146 self.consensus.gc(view, epoch);
1147 self.checkpoint_collector.gc(view, epoch);
1148 let _ = self.network.gc(view); self.state_manager.gc(view);
1150 self.vid_disperser.gc(view);
1151 self.vid_reconstructor.gc(view);
1152 self.vote1_collector.gc(view, epoch);
1153 self.vote2_collector.gc(view, epoch);
1154 self.timeout_collector.gc(view, epoch);
1155 self.timeout_one_honest_collector.gc(view, epoch);
1156 self.epoch_root_collector.gc(view, epoch);
1157 self.epoch_manager.gc(epoch);
1158 self.block_builder.gc(view);
1159 self.pending_proposal_fetches.gc(view);
1160 self.storage.gc(view);
1161 self.cached_validated_proposals = self.cached_validated_proposals.split_off(&view);
1162 self.cached_vid_shares = self.cached_vid_shares.split_off(&view);
1163 }
1164}
1165
1166fn check_payload_commitment<T: NodeType>(
1167 proposal: &SignedProposal<T, Proposal<T>>,
1168 vid_share: &VidDisperseShare2<T>,
1169) -> bool {
1170 let VidCommitment::V2(commit) = proposal.data.block_header.payload_commitment() else {
1171 warn!(
1172 "unexpected payload commitment type in view {}, proposal discarded",
1173 proposal.data.view_number
1174 );
1175 return false;
1176 };
1177 if commit != vid_share.payload_commitment {
1178 warn!(
1179 "payload commitment mismatch in view {}, discard the proposal",
1180 proposal.data.view_number
1181 );
1182 return false;
1183 }
1184 true
1185}
1186
1187type ProposalFetchResponseSender<T> =
1188 oneshot::Sender<Result<SignedProposal<T, Proposal<T>>, QueryError>>;
1189
1190#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
1191struct ProposalFetchKey<T: NodeType> {
1192 view: ViewNumber,
1193 leaf_commitment: Commitment<Leaf2<T>>,
1194}
1195
1196impl<T: NodeType> ProposalFetchKey<T> {
1197 fn new(view: ViewNumber, leaf_commitment: Commitment<Leaf2<T>>) -> Self {
1198 Self {
1199 view,
1200 leaf_commitment,
1201 }
1202 }
1203}
1204
1205#[derive(Default)]
1206struct PendingProposalFetches<T: NodeType> {
1207 pending: HashMap<ProposalFetchKey<T>, Vec<ProposalFetchResponseSender<T>>>,
1208}
1209
1210impl<T: NodeType> PendingProposalFetches<T> {
1211 fn prune_closed(&mut self) {
1212 self.pending.retain(|_, responders| {
1213 responders.retain(|respond| !respond.is_closed());
1214 !responders.is_empty()
1215 });
1216 }
1217
1218 fn contains_request(
1219 &mut self,
1220 view: ViewNumber,
1221 leaf_commitment: Commitment<Leaf2<T>>,
1222 ) -> bool {
1223 self.prune_closed();
1224 self.pending
1225 .contains_key(&ProposalFetchKey::new(view, leaf_commitment))
1226 }
1227
1228 fn push(
1229 &mut self,
1230 view: ViewNumber,
1231 leaf_commitment: Commitment<Leaf2<T>>,
1232 respond: ProposalFetchResponseSender<T>,
1233 ) {
1234 self.pending
1235 .entry(ProposalFetchKey::new(view, leaf_commitment))
1236 .or_default()
1237 .push(respond);
1238 }
1239
1240 #[allow(dead_code)]
1241 fn gc(&mut self, view: ViewNumber) {
1242 self.pending.retain(|key, responders| {
1243 responders.retain(|respond| !respond.is_closed());
1244 key.view >= view && !responders.is_empty()
1245 });
1246 }
1247
1248 fn resolve(&mut self, proposal: &SignedProposal<T, Proposal<T>>) {
1249 self.prune_closed();
1250 let view = proposal.data.view_number;
1251 let leaf_commitment = proposal_commitment(&proposal.data);
1252 let key = ProposalFetchKey::new(view, leaf_commitment);
1253
1254 if let Some(responders) = self.pending.remove(&key) {
1255 for respond in responders {
1256 let _ = respond.send(Ok(proposal.clone()));
1257 }
1258 }
1259 }
1260}