1mod external_event_handler;
2mod message_compat_tests;
3mod proposal_fetcher;
4mod request_response;
5mod startup_catchup;
6
7pub mod api;
8pub mod catchup;
9pub mod consensus_handle;
10pub mod context;
11pub mod genesis;
12pub use espresso_keyset as keyset;
13pub mod network;
14pub mod options;
15pub mod persistence;
16pub mod run;
17pub mod state;
18pub mod state_cert;
19pub mod state_signature;
20pub mod util;
21
22use std::{fmt::Debug, marker::PhantomData, num::NonZeroU64, sync::Arc, time::Duration};
23
24use alloy::primitives::U256;
25use anyhow::Context;
26use async_lock::Mutex;
27use catchup::{ParallelStateCatchup, StatePeers};
28use context::SequencerContext;
29use derivative::Derivative;
30use dyn_clone::clone_box;
31use espresso_types::{
32 BackoffParams, EpochCommittees, EpochRewardsCalculator, L1ClientOptions, NodeState, PubKey,
33 SeqTypes, ValidatedState,
34 traits::{EventConsumer, MembershipPersistence},
35 v0::traits::SequencerPersistence,
36 v0_3::Fetcher,
37};
38pub use genesis::Genesis;
39use genesis::L1Finalized;
40use hotshot::{
41 traits::implementations::{
42 CdnMetricsValue, CdnTopic, CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork,
43 MemoryNetwork, PushCdnNetwork, RequestResponseConfig, WrappedSignatureKey,
44 derive_libp2p_multiaddr, derive_libp2p_peer_id,
45 },
46 types::SignatureKey,
47};
48use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
49use hotshot_new_protocol::network::cliquenet::Cliquenet;
50use hotshot_orchestrator::client::{OrchestratorClient, get_complete_config};
51use hotshot_types::{
52 ValidatorConfig,
53 addr::NetAddr,
54 data::ViewNumber,
55 epoch_membership::EpochMembershipCoordinator,
56 light_client::{StateKeyPair, StateSignKey},
57 message::UpgradeLock,
58 signature_key::{BLSPrivKey, BLSPubKey},
59 traits::{
60 metrics::{Metrics, NoMetrics},
61 network::ConnectedNetwork,
62 node_implementation::{NodeImplementation, NodeType},
63 storage::Storage,
64 },
65 utils::BuilderCommitment,
66 x25519,
67};
68use libp2p::Multiaddr;
69use moka::future::Cache;
70use network::libp2p::split_off_peer_id;
71use options::Identity;
72pub use options::Options;
73use proposal_fetcher::ProposalFetcherConfig;
74pub use run::main;
75use serde::{Deserialize, Serialize};
76use tokio::select;
77use tracing::info;
78use url::Url;
79use vbs::version::StaticVersion;
80
81use crate::request_response::data_source::Storage as RequestResponseStorage;
82
83pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
84
85#[derive(Derivative, Serialize, Deserialize)]
87#[derivative(
88 Copy(bound = ""),
89 Debug(bound = ""),
90 Default(bound = ""),
91 PartialEq(bound = ""),
92 Eq(bound = ""),
93 Hash(bound = "")
94)]
95pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
96
97impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
100 fn clone(&self) -> Self {
101 *self
102 }
103}
104
105pub type SequencerApiVersion = StaticVersion<0, 1>;
106
107impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
108 for Node<N, P>
109{
110 type Network = N;
111 type Storage = Arc<P>;
112}
113
114#[derive(Clone, Debug)]
115pub struct NetworkParams {
116 pub cdn_endpoint: String,
118 pub orchestrator_url: Url,
119 pub state_relay_server_url: Url,
120
121 pub builder_urls: Vec<Url>,
123
124 pub private_staking_key: BLSPrivKey,
125 pub private_state_key: StateSignKey,
126 pub state_peers: Vec<Url>,
127 pub config_peers: Option<Vec<Url>>,
128 pub catchup_backoff: BackoffParams,
129 pub catchup_base_timeout: Duration,
131 pub local_catchup_timeout: Duration,
133 pub bootstrap_epoch_catchup_timeout: Duration,
136 pub new_protocol_consensus_gc_interval: NonZeroU64,
138 pub public_api_url: Option<Url>,
140 pub cliquenet_bind_addr: NetAddr,
142 pub cliquenet_advertise_addr: Option<NetAddr>,
144 pub x25519_secret_key: x25519::SecretKey,
146 pub libp2p_advertise_address: Option<String>,
150 pub libp2p_bind_address: String,
152 pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
155
156 pub libp2p_heartbeat_interval: Duration,
158
159 pub libp2p_history_gossip: usize,
161 pub libp2p_history_length: usize,
163
164 pub libp2p_mesh_n: usize,
166 pub libp2p_mesh_n_high: usize,
168 pub libp2p_mesh_n_low: usize,
170 pub libp2p_mesh_outbound_min: usize,
172
173 pub libp2p_max_gossip_transmit_size: usize,
175
176 pub libp2p_max_direct_transmit_size: u64,
178
179 pub libp2p_max_ihave_length: usize,
181
182 pub libp2p_max_ihave_messages: usize,
184
185 pub libp2p_published_message_ids_cache_time: Duration,
187
188 pub libp2p_iwant_followup_time: Duration,
190
191 pub libp2p_max_messages_per_rpc: Option<usize>,
193
194 pub libp2p_gossip_retransmission: u32,
196
197 pub libp2p_flood_publish: bool,
199
200 pub libp2p_duplicate_cache_time: Duration,
202
203 pub libp2p_fanout_ttl: Duration,
205
206 pub libp2p_heartbeat_initial_delay: Duration,
208
209 pub libp2p_gossip_factor: f64,
211
212 pub libp2p_gossip_lazy: usize,
214}
215
216pub struct L1Params {
217 pub urls: Vec<Url>,
218 pub options: L1ClientOptions,
219}
220
221#[allow(clippy::too_many_arguments)]
222pub async fn init_node<P>(
223 genesis: Genesis,
224 network_params: NetworkParams,
225 metrics: Box<dyn Metrics>,
226 mut persistence: P,
227 l1_params: L1Params,
228 storage: Option<RequestResponseStorage>,
229 event_consumer: impl EventConsumer + 'static,
230 is_da: bool,
231 identity: Identity,
232 proposal_fetcher_config: ProposalFetcherConfig,
233) -> anyhow::Result<SequencerContext<network::Production, P>>
234where
235 P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
236 Arc<P>: Storage<SeqTypes>,
237{
238 let info = espresso_utils::build_info!();
240 metrics
241 .text_family(
242 "version".into(),
243 vec!["rev".into(), "desc".into(), "timestamp".into()],
244 )
245 .create(vec![
246 info.git_sha.into(),
247 info.git_describe.into(),
248 info.git_commit_timestamp.into(),
249 ]);
250
251 metrics
252 .text_family(
253 "build_info".into(),
254 vec![
255 "modified".into(),
256 "branch".into(),
257 "debug".into(),
258 "features".into(),
259 ],
260 )
261 .create(vec![
262 info.git_dirty.into(),
263 info.git_branch.into(),
264 info.is_debug.to_string(),
265 env!("VERGEN_CARGO_FEATURES").into(),
266 ]);
267
268 metrics
270 .text_family(
271 "node_identity_general".into(),
272 vec![
273 "name".into(),
274 "description".into(),
275 "company_name".into(),
276 "company_website".into(),
277 "operating_system".into(),
278 "node_type".into(),
279 "network_type".into(),
280 ],
281 )
282 .create(vec![
283 identity.node_name.unwrap_or_default(),
284 identity.node_description.unwrap_or_default(),
285 identity.company_name.unwrap_or_default(),
286 identity
287 .company_website
288 .map(|u| u.into())
289 .unwrap_or_default(),
290 identity.operating_system.unwrap_or_default(),
291 identity.node_type.unwrap_or_default(),
292 identity.network_type.unwrap_or_default(),
293 ]);
294
295 metrics
297 .text_family(
298 "node_identity_location".into(),
299 vec!["country".into(), "latitude".into(), "longitude".into()],
300 )
301 .create(vec![
302 identity.country_code.unwrap_or_default(),
303 identity.latitude.map(|l| l.to_string()).unwrap_or_default(),
304 identity
305 .longitude
306 .map(|l| l.to_string())
307 .unwrap_or_default(),
308 ]);
309
310 metrics
312 .text_family(
313 "node_identity_icon".into(),
314 vec![
315 "small_1x".into(),
316 "small_2x".into(),
317 "small_3x".into(),
318 "large_1x".into(),
319 "large_2x".into(),
320 "large_3x".into(),
321 ],
322 )
323 .create(vec![
324 identity
325 .icon_14x14_1x
326 .map(|u| u.to_string())
327 .unwrap_or_default(),
328 identity
329 .icon_14x14_2x
330 .map(|u| u.to_string())
331 .unwrap_or_default(),
332 identity
333 .icon_14x14_3x
334 .map(|u| u.to_string())
335 .unwrap_or_default(),
336 identity
337 .icon_24x24_1x
338 .map(|u| u.to_string())
339 .unwrap_or_default(),
340 identity
341 .icon_24x24_2x
342 .map(|u| u.to_string())
343 .unwrap_or_default(),
344 identity
345 .icon_24x24_3x
346 .map(|u| u.to_string())
347 .unwrap_or_default(),
348 ]);
349
350 let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
352 metrics
353 .text_family("node".into(), vec!["key".into()])
354 .create(vec![pub_key.to_string()]);
355
356 let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
358 .with_context(|| {
359 format!(
360 "Failed to derive Libp2p bind address of {}",
361 &network_params.libp2p_bind_address
362 )
363 })?;
364 let advertise_multiaddr = network_params
365 .libp2p_advertise_address
366 .as_ref()
367 .map(|addr| {
368 derive_libp2p_multiaddr(addr)
369 .with_context(|| format!("Failed to derive Libp2p advertise address of {addr}"))
370 })
371 .transpose()?;
372 let advertise_is_global = match network_params
373 .libp2p_advertise_address
374 .as_deref()
375 .and_then(|s| s.parse::<NetAddr>().ok())
376 {
377 Some(parsed) if !parsed.is_probably_global() => {
378 tracing::error!(
379 "Libp2p advertise address {parsed} is probably not publicly routable. This is \
380 fine for local testing (demo-native, docker-compose) but is wrong for any real \
381 deployment: remote peers will fail to dial us."
382 );
383 false
384 },
385 _ => true,
386 };
387
388 let libp2p_announce_addresses: Vec<Multiaddr> = advertise_multiaddr.iter().cloned().collect();
391
392 let libp2p_external_addresses: Vec<Multiaddr> = if advertise_is_global {
397 advertise_multiaddr.iter().cloned().collect()
398 } else {
399 Vec::new()
400 };
401
402 info!("Libp2p bind address: {}", libp2p_bind_address);
403 info!("Libp2p announce addresses: {:?}", libp2p_announce_addresses);
404 info!("Libp2p external addresses: {:?}", libp2p_external_addresses);
405
406 let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
408 let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
409
410 let mut validator_config = ValidatorConfig {
413 public_key: pub_key,
414 private_key: network_params.private_staking_key,
415 stake_value: U256::ONE,
416 state_public_key: state_key_pair.ver_key(),
417 state_private_key: state_key_pair.sign_key(),
418 is_da,
419 x25519_keypair: None,
420 p2p_addr: None,
421 };
422
423 let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
425 &validator_config.private_key,
426 )
427 .with_context(|| "Failed to derive Libp2p peer ID")?;
428
429 info!("Starting Libp2p with PeerID: {libp2p_public_key}");
431
432 let loaded_network_config_from_persistence = persistence.load_config().await?;
433 let (mut network_config, wait_for_orchestrator, persist_config) = match (
434 loaded_network_config_from_persistence,
435 network_params.config_peers,
436 ) {
437 (Some(config), _) => {
438 tracing::warn!("loaded network config from storage, rejoining existing network");
439 (config, false, false)
440 },
441 (None, Some(peers)) => {
443 tracing::warn!(?peers, "loading network config from peers");
444 let peers = StatePeers::<SequencerApiVersion>::from_urls(
445 peers,
446 network_params.catchup_backoff,
447 network_params.catchup_base_timeout,
448 &NoMetrics,
449 );
450 let config = peers.fetch_config(validator_config.clone()).await?;
451
452 tracing::warn!(
453 node_id = config.node_index,
454 stake_table = ?config.config.known_nodes_with_stake,
455 "loaded config",
456 );
457 (config, false, true)
458 },
459 (None, None) => {
461 tracing::warn!("loading network config from orchestrator");
462 tracing::warn!(
463 "waiting for other nodes to connect, DO NOT RESTART until fully connected"
464 );
465
466 if genesis.base_version >= versions::NEW_PROTOCOL_VERSION {
471 let advertise_addr = network_params.cliquenet_advertise_addr.clone().context(
472 "ESPRESSO_NODE_CLIQUENET_ADVERTISE_ADDRESS must be set when bootstrapping a \
473 Cliquenet network from the orchestrator",
474 )?;
475 validator_config.x25519_keypair =
476 Some(x25519::Keypair::from(&network_params.x25519_secret_key));
477 validator_config.p2p_addr = Some(advertise_addr);
478 }
479
480 let bootstrap_advertise_addr = libp2p_announce_addresses.first().cloned().context(
481 "ESPRESSO_NODE_LIBP2P_ADVERTISE_ADDRESS must be set when bootstrapping a libp2p \
482 network from the orchestrator",
483 )?;
484
485 let config = get_complete_config(
486 &orchestrator_client,
487 validator_config.clone(),
488 Some(bootstrap_advertise_addr),
491 Some(libp2p_public_key),
492 )
493 .await?
494 .0;
495
496 tracing::warn!(
497 node_id = config.node_index,
498 stake_table = ?config.config.known_nodes_with_stake,
499 "loaded config",
500 );
501 tracing::warn!("all nodes connected");
502 (config, true, true)
503 },
504 };
505
506 if let Some(upgrade) = genesis.upgrades.get(&genesis.upgrade_version) {
507 upgrade.set_hotshot_config_parameters(&mut network_config.config);
508 }
509
510 if !network_params.builder_urls.is_empty() {
513 network_config.config.builder_urls = network_params.builder_urls.try_into().unwrap();
514 }
515
516 let epoch_height = genesis.epoch_height.unwrap_or_default();
517 let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
518 let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
519 let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
520 let stake_table_capacity = genesis
521 .stake_table_capacity
522 .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
523
524 let version_upgrade = versions::Upgrade::new(genesis.base_version, genesis.upgrade_version);
525
526 tracing::warn!("setting epoch_height={epoch_height:?}");
527 tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
528 tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
529 tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
530 tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
531 tracing::warn!("setting version_upgrade={version_upgrade}");
532 network_config.config.epoch_height = epoch_height;
533 network_config.config.drb_difficulty = drb_difficulty;
534 network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
535 network_config.config.epoch_start_block = epoch_start_block;
536 network_config.config.stake_table_capacity = stake_table_capacity;
537
538 if let Some(da_committees) = &genesis.da_committees {
539 tracing::warn!("setting da_committees from genesis: {da_committees:?}");
540 network_config.config.da_committees = da_committees.clone();
541 }
542
543 if persist_config {
547 persistence.save_config(&network_config).await?;
548 }
549
550 if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
553 if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
554 libp2p_config.bootstrap_nodes = bootstrap_nodes
558 .into_iter()
559 .map(split_off_peer_id)
560 .collect::<Result<Vec<_>, _>>()
561 .with_context(|| "Failed to parse peer ID from bootstrap node")?;
562 } else {
563 tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
566 }
567 }
568
569 let node_index = network_config.node_index;
570
571 let topics = {
573 let mut topics = vec![CdnTopic::Global];
574 if is_da {
575 topics.push(CdnTopic::Da);
576 }
577 topics
578 };
579
580 let cdn_network = PushCdnNetwork::new(
582 network_params.cdn_endpoint,
583 topics,
584 KeyPair {
585 public_key: WrappedSignatureKey(validator_config.public_key),
586 private_key: validator_config.private_key.clone(),
587 },
588 CdnMetricsValue::new(&*metrics),
589 )
590 .with_context(|| format!("Failed to create CDN network {node_index}"))?;
591
592 let gossip_config = GossipConfig {
594 heartbeat_interval: network_params.libp2p_heartbeat_interval,
595 history_gossip: network_params.libp2p_history_gossip,
596 history_length: network_params.libp2p_history_length,
597 mesh_n: network_params.libp2p_mesh_n,
598 mesh_n_high: network_params.libp2p_mesh_n_high,
599 mesh_n_low: network_params.libp2p_mesh_n_low,
600 mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
601 max_ihave_messages: network_params.libp2p_max_ihave_messages,
602 max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
603 max_ihave_length: network_params.libp2p_max_ihave_length,
604 published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
605 iwant_followup_time: network_params.libp2p_iwant_followup_time,
606 max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
607 gossip_retransmission: network_params.libp2p_gossip_retransmission,
608 flood_publish: network_params.libp2p_flood_publish,
609 duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
610 fanout_ttl: network_params.libp2p_fanout_ttl,
611 heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
612 gossip_factor: network_params.libp2p_gossip_factor,
613 gossip_lazy: network_params.libp2p_gossip_lazy,
614 };
615
616 let request_response_config = RequestResponseConfig {
618 request_size_maximum: network_params.libp2p_max_direct_transmit_size,
619 response_size_maximum: network_params.libp2p_max_direct_transmit_size,
620 };
621
622 let l1_client = l1_params
623 .options
624 .with_metrics(&*metrics)
625 .connect(l1_params.urls)
626 .with_context(|| "failed to create L1 client")?;
627
628 info!("Validating fee contract");
629
630 genesis.validate_fee_contract(&l1_client).await?;
631
632 info!("Fee contract validated. Spawning L1 tasks");
633
634 l1_client.spawn_tasks().await;
635
636 info!(
637 "L1 tasks spawned. Waiting for L1 genesis: {:?}",
638 genesis.l1_finalized
639 );
640
641 let l1_genesis = match genesis.l1_finalized {
642 L1Finalized::Block(b) => b,
643 L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
644 L1Finalized::Timestamp { timestamp } => {
645 l1_client
646 .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
647 .await
648 },
649 };
650
651 info!("L1 genesis found: {:?}", l1_genesis);
652
653 let genesis_chain_config = genesis.header.chain_config;
654 let mut genesis_state = ValidatedState {
655 chain_config: genesis_chain_config.into(),
656 ..Default::default()
657 };
658 for (address, amount) in genesis.accounts {
659 tracing::warn!(%address, %amount, "Prefunding account for demo");
660 genesis_state.prefund_account(address, amount);
661 }
662
663 let state_catchup_providers =
665 ParallelStateCatchup::new(&[], network_params.local_catchup_timeout);
666
667 let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
669 network_params.state_peers,
670 network_params.catchup_backoff,
671 network_params.catchup_base_timeout,
672 &*metrics,
673 );
674 state_catchup_providers.add_provider(Arc::new(state_peers));
675
676 match persistence
678 .clone()
679 .into_catchup_provider(network_params.catchup_backoff)
680 {
681 Ok(catchup) => {
682 state_catchup_providers.add_provider(Arc::new(catchup));
683 },
684 Err(e) => {
685 tracing::warn!(
686 "Failed to create local catchup provider: {e:#}. Only using remote catchup."
687 );
688 },
689 };
690
691 persistence.enable_metrics(&*metrics);
692
693 let fetcher = Fetcher::new(
694 Arc::new(state_catchup_providers.clone()),
695 Arc::new(Mutex::new(persistence.clone())),
696 l1_client.clone(),
697 genesis.chain_config,
698 );
699
700 info!("Spawning update loop");
701
702 fetcher.spawn_update_loop().await;
703 info!("Update loop spawned. Fetching block reward");
704
705 let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
706 info!("Block reward fetched: {:?}", block_reward);
707 let mut membership = EpochCommittees::new_stake(
709 network_config.config.known_nodes_with_stake.clone(),
710 network_config.config.known_da_nodes.clone(),
711 block_reward,
712 fetcher,
713 epoch_height,
714 );
715 info!("Membership created. Reloading stake");
716 membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
717 info!("Stake reloaded");
718
719 check_cliquenet_info_registered(
720 &membership,
721 &validator_config.public_key,
722 genesis.base_version,
723 genesis.chain_config.stake_table_contract,
724 &l1_client,
725 )
726 .await;
727
728 let persistence = Arc::new(persistence);
729 let coordinator = EpochMembershipCoordinator::new(
730 membership,
731 network_config.config.epoch_height,
732 &persistence,
733 );
734
735 let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
736
737 let instance_state = NodeState {
738 chain_config: genesis.chain_config,
739 genesis_chain_config,
740 l1_client,
741 genesis_header: genesis.header,
742 genesis_state,
743 l1_genesis: Some(l1_genesis),
744 node_id: node_index,
745 upgrades: genesis.upgrades,
746 current_version: genesis.base_version,
747 epoch_height: Some(epoch_height),
748 state_catchup: Arc::new(state_catchup_providers.clone()),
749 coordinator: coordinator.clone(),
750 genesis_version: genesis.genesis_version,
751 epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
752 epoch_rewards_calculator,
753 light_client_contract_address: Cache::builder().max_capacity(1).build(),
754 token_contract_address: Cache::builder().max_capacity(1).build(),
755 finalized_hotshot_height: Cache::builder()
756 .max_capacity(1)
757 .time_to_live(Duration::from_secs(30))
758 .build(),
759 };
760
761 let combined_network = {
762 info!("Initializing Libp2p network");
763 let p2p_network = Libp2pNetwork::from_config(
764 network_config.clone(),
765 persistence.clone(),
766 gossip_config,
767 request_response_config,
768 libp2p_bind_address,
769 libp2p_external_addresses,
770 &validator_config.public_key,
771 &validator_config.private_key,
774 hotshot::traits::implementations::Libp2pMetricsValue::new(&*metrics),
775 )
776 .await
777 .with_context(|| {
778 format!(
779 "Failed to create libp2p network on node {node_index}; binding to {:?}",
780 network_params.libp2p_bind_address
781 )
782 })?;
783
784 info!("Libp2p network initialized");
785
786 tracing::warn!("Waiting for at least one connection to be initialized");
787 select! {
788 _ = cdn_network.wait_for_ready() => {
789 tracing::warn!("CDN connection initialized");
790 },
791 _ = p2p_network.wait_for_ready() => {
792 tracing::warn!("P2P connection initialized");
793 },
794 };
795
796 CombinedNetworks::new(cdn_network, p2p_network, Some(Duration::from_secs(1)))
798 };
799
800 let cliquenet = Cliquenet::create(
804 "espresso",
805 pub_key,
806 network_params.x25519_secret_key.into(),
807 network_params.cliquenet_bind_addr.clone(),
808 [],
809 UpgradeLock::new(version_upgrade),
810 clone_box(&*metrics),
811 )
812 .await?;
813
814 let network = Arc::new(combined_network);
815
816 let mut ctx = SequencerContext::init(
817 network_config,
818 version_upgrade,
819 validator_config,
820 coordinator,
821 instance_state,
822 storage,
823 state_catchup_providers,
824 persistence,
825 network.clone(),
826 cliquenet,
827 Some(network_params.state_relay_server_url),
828 &*metrics,
829 genesis.stake_table.capacity,
830 event_consumer,
831 proposal_fetcher_config,
832 network_params.bootstrap_epoch_catchup_timeout,
833 network_params.new_protocol_consensus_gc_interval,
834 )
835 .await?;
836
837 if wait_for_orchestrator {
838 ctx = ctx.wait_for_orchestrator(orchestrator_client);
839 }
840
841 Ok(ctx)
842}
843
844pub fn empty_builder_commitment() -> BuilderCommitment {
845 BuilderCommitment::from_bytes([])
846}
847
848async fn check_cliquenet_info_registered(
853 membership: &EpochCommittees,
854 pub_key: &BLSPubKey,
855 current_version: vbs::version::Version,
856 stake_table_contract: Option<alloy::primitives::Address>,
857 l1_client: &espresso_types::v0::L1Client,
858) {
859 if current_version != versions::EPOCH_REWARD_VERSION {
860 return;
861 }
862 let Some(addr) = stake_table_contract else {
863 return;
864 };
865 let stake_table =
866 hotshot_contract_adapter::sol_types::StakeTableV3::new(addr, l1_client.provider.clone());
867 let mut major = None;
868 let mut last_err = None;
869 for attempt in 1..=3 {
870 match stake_table.getVersion().call().await {
871 Ok(v) => {
872 major = Some(v.majorVersion);
873 break;
874 },
875 Err(e) => {
876 tracing::warn!(attempt, %e, "failed to read StakeTable getVersion(), retrying");
877 last_err = Some(e);
878 if attempt < 3 {
879 tokio::time::sleep(Duration::from_secs(1)).await;
880 }
881 },
882 }
883 }
884 let Some(major) = major else {
885 tracing::warn!(
886 err = ?last_err,
887 "could not read StakeTable getVersion() after 3 attempts; skipping check"
888 );
889 return;
890 };
891 if major < 3 {
892 tracing::info!(
893 major,
894 "StakeTableV3 not deployed; skipping network-info registration check"
895 );
896 return;
897 }
898 let Some(cfg) = membership.latest_peer_config(pub_key) else {
899 return;
900 };
901 if cfg.connect_info.is_some() {
902 return;
903 }
904 tracing::error!(
905 bls_key = %pub_key,
906 "Validator has no x25519 key or p2p address registered on-chain. After the CLIQUENET \
907 upgrade activates, the validator will be excluded from the active set and stop earning \
908 rewards. To fix: (1) generate an x25519 keypair if needed (`keygen --scheme x25519 \
909 --out keys.env`), then (2) register it on-chain (`staking-cli update-network-config \
910 --x25519-key <ESPRESSO_NODE_PUBLIC_X25519_KEY> --p2p-addr <host:port>`)."
911 );
912}
913
914#[cfg(any(test, feature = "testing"))]
915pub mod testing {
916 use std::{
917 cmp::max,
918 collections::{BTreeMap, HashMap},
919 net::Ipv4Addr,
920 time::Duration,
921 };
922
923 use alloy::{
924 network::EthereumWallet,
925 node_bindings::{Anvil, AnvilInstance},
926 primitives::{Address, U256},
927 providers::{
928 Provider, ProviderBuilder, RootProvider,
929 fillers::{
930 BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
931 },
932 layers::AnvilProvider,
933 },
934 signers::{k256::ecdsa::SigningKey, local::LocalSigner},
935 };
936 use catchup::NullStateCatchup;
937 use committable::Committable;
938 use espresso_contract_deployer::{
939 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
940 network_config::light_client_genesis_from_stake_table,
941 };
942 use espresso_types::{
943 EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
944 Upgrade, UpgradeMap,
945 eth_signature_key::EthKeyPair,
946 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
947 };
948 use futures::{
949 future::join_all,
950 stream::{Stream, StreamExt},
951 };
952 use hotshot::{
953 traits::{
954 BlockPayload,
955 implementations::{MasterMap, MemoryNetwork},
956 },
957 types::EventType,
958 };
959 use hotshot_builder_refactored::service::{
960 BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
961 };
962 use hotshot_testing::block_builder::{
963 BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
964 };
965 use hotshot_types::{
966 HotShotConfig, PeerConfig,
967 data::EpochNumber,
968 event::LeafInfo,
969 light_client::StateKeyPair,
970 message::UpgradeLock,
971 new_protocol::CoordinatorEvent,
972 traits::{
973 EncodeBytes, block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
974 signature_key::BuilderSignatureKey,
975 },
976 };
977 use rand::SeedableRng as _;
978 use rand_chacha::ChaCha20Rng;
979 use staking_cli::demo::{DelegationConfig, StakingKeySet, StakingTransactions};
980 use test_utils::reserve_tcp_port;
981 use tokio::spawn;
982 use vbs::version::{StaticVersionType, Version};
983 use versions::EPOCH_VERSION;
984
985 use super::*;
986 use crate::{
987 catchup::ParallelStateCatchup,
988 persistence::no_storage::{self, NoStorage},
989 };
990
991 const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
992 const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
993 type AnvilFillProvider = AnvilProvider<
994 FillProvider<
995 JoinFill<
996 alloy::providers::Identity,
997 JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
998 >,
999 RootProvider,
1000 >,
1001 >;
1002 struct LegacyBuilderImplementation {
1003 global_state: Arc<LegacyGlobalState<SeqTypes>>,
1004 }
1005
1006 impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
1007 fn start(
1008 self: Box<Self>,
1009 stream: Box<
1010 dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
1011 + std::marker::Unpin
1012 + Send
1013 + 'static,
1014 >,
1015 ) {
1016 spawn(async move {
1017 let res = self.global_state.start_event_loop(stream).await;
1018 tracing::error!(?res, "testing legacy builder service exited");
1019 });
1020 }
1021 }
1022
1023 pub async fn run_legacy_builder<const NUM_NODES: usize>(
1024 port: Option<u16>,
1025 max_block_size: Option<u64>,
1026 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
1027 let builder_key_pair = TestConfig::<0>::builder_key();
1028 let port = match port {
1029 Some(p) => p,
1030 None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
1031 };
1032
1033 let url: Url = format!("http://localhost:{port}")
1035 .parse()
1036 .expect("Failed to parse builder URL");
1037
1038 let global_state = LegacyGlobalState::new(
1040 LegacyBuilderConfig {
1041 builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
1042 max_api_waiting_time: Duration::from_secs(1),
1043 max_block_size_increment_period: Duration::from_secs(60),
1044 maximize_txn_capture_timeout: Duration::from_millis(100),
1045 txn_garbage_collect_duration: Duration::from_secs(60),
1046 txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
1047 tx_status_cache_capacity: 81920,
1048 base_fee: 10,
1049 },
1050 NodeState::default(),
1051 max_block_size.unwrap_or(300),
1052 NUM_NODES,
1053 );
1054
1055 let app = Arc::clone(&global_state)
1057 .into_app()
1058 .expect("Failed to create builder tide-disco app");
1059
1060 spawn(async move {
1061 app.serve(
1062 format!("http://0.0.0.0:{port}")
1063 .parse::<Url>()
1064 .expect("Failed to parse builder listener"),
1065 EpochVersion::instance(),
1066 )
1067 .await
1068 });
1069
1070 (Box::new(LegacyBuilderImplementation { global_state }), url)
1072 }
1073
1074 pub async fn run_test_builder<const NUM_NODES: usize>(
1075 port: Option<u16>,
1076 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
1077 let port = match port {
1078 Some(p) => p,
1079 None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
1080 };
1081
1082 let url: Url = format!("http://localhost:{port}")
1084 .parse()
1085 .expect("Failed to parse builder URL");
1086 tracing::info!("Starting test builder on {url}");
1087
1088 let task = <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
1089 NUM_NODES,
1090 format!("http://0.0.0.0:{port}")
1091 .parse()
1092 .expect("Failed to parse builder listener"),
1093 (),
1094 HashMap::new(),
1095 )
1096 .await;
1097
1098 (task, url)
1099 }
1100
1101 pub struct TestConfigBuilder<const NUM_NODES: usize> {
1102 config: HotShotConfig<SeqTypes>,
1103 priv_keys: Vec<BLSPrivKey>,
1104 state_key_pairs: Vec<StateKeyPair>,
1105 master_map: Arc<MasterMap<PubKey>>,
1106 l1_url: Url,
1107 l1_opt: L1ClientOptions,
1108 anvil_provider: Option<AnvilFillProvider>,
1109 signer: LocalSigner<SigningKey>,
1110 state_relay_url: Option<Url>,
1111 builder_port: Option<u16>,
1112 upgrades: BTreeMap<Version, Upgrade>,
1113 }
1114
1115 pub fn staking_priv_keys(
1116 priv_keys: &[BLSPrivKey],
1117 state_key_pairs: &[StateKeyPair],
1118 num_nodes: usize,
1119 ) -> Vec<StakingKeySet> {
1120 let seed = [42u8; 32];
1121 let mut rng = ChaCha20Rng::from_seed(seed); let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
1123 eth_key_pairs
1124 .zip(priv_keys.iter())
1125 .zip(state_key_pairs.iter())
1126 .map(|((eth, bls), state)| StakingKeySet {
1127 signer: eth,
1128 bls: bls.clone().into(),
1129 state: state.clone(),
1130 x25519: x25519::Keypair::generate().expect("x25519 keypair"),
1131 p2p_addr: "127.0.0.1:8080".parse().unwrap(),
1132 })
1133 .collect()
1134 }
1135
1136 impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
1137 pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
1138 self.builder_port = builder_port;
1139 self
1140 }
1141
1142 pub fn state_relay_url(mut self, url: Url) -> Self {
1143 self.state_relay_url = Some(url);
1144 self
1145 }
1146
1147 pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
1152 self.l1_url = anvil.endpoint().parse().unwrap();
1153 let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
1154 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1155 self.anvil_provider = Some(anvil_provider);
1156 self
1157 }
1158
1159 pub fn l1_url(mut self, l1_url: Url) -> Self {
1162 self.anvil_provider = None;
1163 self.l1_url = l1_url;
1164 self
1165 }
1166
1167 pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
1168 self.l1_opt = opt;
1169 self
1170 }
1171
1172 pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
1173 self.signer = signer;
1174 self
1175 }
1176
1177 pub fn upgrades(mut self, v: Version, upgrades: BTreeMap<Version, Upgrade>) -> Self {
1178 let upgrade = upgrades.get(&v).unwrap();
1179 upgrade.set_hotshot_config_parameters(&mut self.config);
1180 self.upgrades = upgrades;
1181 self
1182 }
1183
1184 pub async fn set_upgrades(mut self, version: Version) -> Self {
1187 let upgrade = match version {
1188 version if version >= EPOCH_VERSION => {
1189 tracing::debug!(?version, "upgrade version");
1190 let blocks_per_epoch = self.config.epoch_height;
1191 let epoch_start_block = self.config.epoch_start_block;
1192
1193 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1194 &self.config.hotshot_stake_table(),
1195 STAKE_TABLE_CAPACITY_FOR_TEST,
1196 )
1197 .unwrap();
1198
1199 let validators =
1200 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
1201
1202 let deployer = ProviderBuilder::new()
1203 .wallet(EthereumWallet::from(self.signer.clone()))
1204 .connect_http(self.l1_url.clone());
1205
1206 let mut contracts = Contracts::new();
1207 let args = DeployerArgsBuilder::default()
1208 .deployer(deployer.clone())
1209 .rpc_url(self.l1_url.clone())
1210 .mock_light_client(true)
1211 .genesis_lc_state(genesis_state)
1212 .genesis_st_state(genesis_stake)
1213 .blocks_per_epoch(blocks_per_epoch)
1214 .epoch_start_block(epoch_start_block)
1215 .exit_escrow_period(U256::from(max(
1216 blocks_per_epoch * 15 + 100,
1217 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1218 )))
1219 .multisig_pauser(self.signer.address())
1220 .token_name("Espresso".to_string())
1221 .token_symbol("ESP".to_string())
1222 .initial_token_supply(U256::from(3590000000u64))
1223 .ops_timelock_delay(U256::from(0))
1224 .ops_timelock_admin(self.signer.address())
1225 .ops_timelock_proposers(vec![self.signer.address()])
1226 .ops_timelock_executors(vec![self.signer.address()])
1227 .safe_exit_timelock_delay(U256::from(10))
1228 .safe_exit_timelock_admin(self.signer.address())
1229 .safe_exit_timelock_proposers(vec![self.signer.address()])
1230 .safe_exit_timelock_executors(vec![self.signer.address()])
1231 .build()
1232 .unwrap();
1233 args.deploy_to_stake_table_v3(&mut contracts)
1234 .await
1235 .expect("failed to deploy all contracts");
1236
1237 let st_addr = contracts
1238 .address(Contract::StakeTableProxy)
1239 .expect("StakeTableProxy address not found");
1240 StakingTransactions::create(
1241 self.l1_url.clone(),
1242 &deployer,
1243 st_addr,
1244 validators,
1245 None,
1246 DelegationConfig::default(),
1247 )
1248 .await
1249 .expect("stake table setup failed")
1250 .apply_all()
1251 .await
1252 .expect("send all txns failed");
1253
1254 Upgrade::pos_view_based(st_addr)
1255 },
1256 _ => panic!("Upgrade not configured for version {version:?}"),
1257 };
1258
1259 let mut upgrades = std::collections::BTreeMap::new();
1260 upgrade.set_hotshot_config_parameters(&mut self.config);
1261 upgrades.insert(version, upgrade);
1262
1263 self.upgrades = upgrades;
1264 self
1265 }
1266
1267 pub fn epoch_height(mut self, epoch_height: u64) -> Self {
1268 self.config.epoch_height = epoch_height;
1269 self
1270 }
1271
1272 pub fn epoch_start_block(mut self, start_block: u64) -> Self {
1273 self.config.epoch_start_block = start_block;
1274 self
1275 }
1276
1277 pub fn build(self) -> TestConfig<NUM_NODES> {
1278 TestConfig {
1279 config: self.config,
1280 priv_keys: self.priv_keys,
1281 state_key_pairs: self.state_key_pairs,
1282 master_map: self.master_map,
1283 l1_url: self.l1_url,
1284 l1_opt: self.l1_opt,
1285 signer: self.signer,
1286 state_relay_url: self.state_relay_url,
1287 builder_port: self.builder_port,
1288 upgrades: self.upgrades,
1289 anvil_provider: self.anvil_provider,
1290 }
1291 }
1292
1293 pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
1294 self.config.stake_table_capacity = stake_table_capacity;
1295 self
1296 }
1297 }
1298
1299 impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
1300 fn default() -> Self {
1301 let num_nodes = NUM_NODES;
1302
1303 let seed = [0; 32];
1305 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1306 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1307 .unzip();
1308 let state_key_pairs = (0..num_nodes)
1309 .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1310 .collect::<Vec<_>>();
1311 let known_nodes_with_stake = pub_keys
1312 .iter()
1313 .zip(&state_key_pairs)
1314 .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1315 stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1316 state_ver_key: state_key_pair.ver_key(),
1317 connect_info: None,
1318 })
1319 .collect::<Vec<_>>();
1320
1321 let master_map = MasterMap::new();
1322
1323 let builder_port = reserve_tcp_port().unwrap();
1324
1325 let config: HotShotConfig<SeqTypes> = HotShotConfig {
1326 fixed_leader_for_gpuvid: 0,
1327 num_nodes_with_stake: num_nodes.try_into().unwrap(),
1328 known_da_nodes: known_nodes_with_stake.clone(),
1329 da_committees: Default::default(),
1330 known_nodes_with_stake: known_nodes_with_stake.clone(),
1331 next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1332 num_bootstrap: 1usize,
1333 da_staked_committee_size: num_nodes,
1334 view_sync_timeout: Duration::from_secs(1),
1335 data_request_delay: Duration::from_secs(1),
1336 builder_urls: vec1::vec1![
1337 Url::parse(&format!("http://127.0.0.1:{builder_port}")).unwrap()
1338 ],
1339 builder_timeout: Duration::from_secs(1),
1340 start_threshold: (
1341 known_nodes_with_stake.clone().len() as u64,
1342 known_nodes_with_stake.clone().len() as u64,
1343 ),
1344 start_proposing_view: 0,
1345 stop_proposing_view: 0,
1346 start_voting_view: 0,
1347 stop_voting_view: 0,
1348 start_proposing_time: 0,
1349 start_voting_time: 0,
1350 stop_proposing_time: 0,
1351 stop_voting_time: 0,
1352 epoch_height: 30,
1353 epoch_start_block: 1,
1354 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1355 drb_difficulty: 10,
1356 drb_upgrade_difficulty: 20,
1357 };
1358
1359 let anvil = Anvil::new()
1360 .args(["--slots-in-an-epoch", "0", "--balance", "1000000"])
1361 .spawn();
1362
1363 let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1364 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1365
1366 let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1367 let signer = LocalSigner::from(l1_signer_key);
1368
1369 Self {
1370 config,
1371 priv_keys,
1372 state_key_pairs,
1373 master_map,
1374 l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1375 l1_opt: L1ClientOptions {
1376 stake_table_update_interval: Duration::from_secs(5),
1377 l1_events_max_block_range: 1000,
1378 l1_polling_interval: Duration::from_secs(1),
1379 subscription_timeout: Duration::from_secs(5),
1380 ..Default::default()
1381 },
1382 anvil_provider: Some(anvil_provider),
1383 signer,
1384 state_relay_url: None,
1385 builder_port: None,
1386 upgrades: Default::default(),
1387 }
1388 }
1389 }
1390
1391 #[derive(Clone)]
1392 pub struct TestConfig<const NUM_NODES: usize> {
1393 config: HotShotConfig<SeqTypes>,
1394 priv_keys: Vec<BLSPrivKey>,
1395 state_key_pairs: Vec<StateKeyPair>,
1396 master_map: Arc<MasterMap<PubKey>>,
1397 l1_url: Url,
1398 l1_opt: L1ClientOptions,
1399 anvil_provider: Option<AnvilFillProvider>,
1400 signer: LocalSigner<SigningKey>,
1401 state_relay_url: Option<Url>,
1402 builder_port: Option<u16>,
1403 upgrades: BTreeMap<Version, Upgrade>,
1404 }
1405
1406 impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1407 pub fn num_nodes(&self) -> usize {
1408 self.priv_keys.len()
1409 }
1410
1411 pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1412 &self.config
1413 }
1414
1415 pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1416 self.config.builder_urls = builder_urls;
1417 }
1418
1419 pub fn builder_port(&self) -> Option<u16> {
1420 self.builder_port
1421 }
1422
1423 pub fn signer(&self) -> LocalSigner<SigningKey> {
1424 self.signer.clone()
1425 }
1426
1427 pub fn l1_url(&self) -> Url {
1428 self.l1_url.clone()
1429 }
1430
1431 pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1432 self.anvil_provider.as_ref()
1433 }
1434
1435 pub fn get_upgrade_map(&self) -> UpgradeMap {
1436 self.upgrades.clone().into()
1437 }
1438
1439 pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1440 self.upgrades.clone()
1441 }
1442
1443 pub fn staking_priv_keys(&self) -> Vec<StakingKeySet> {
1444 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1445 }
1446
1447 pub fn validator_providers(
1448 &self,
1449 ) -> Vec<(Address, impl Provider + Clone + use<NUM_NODES>)> {
1450 self.staking_priv_keys()
1451 .into_iter()
1452 .map(|key_set| {
1453 (
1454 key_set.signer.address(),
1455 ProviderBuilder::new()
1456 .wallet(EthereumWallet::from(key_set.signer))
1457 .connect_http(self.l1_url.clone()),
1458 )
1459 })
1460 .collect()
1461 }
1462
1463 pub async fn init_nodes(
1464 &self,
1465 upgrade: versions::Upgrade,
1466 ) -> Vec<SequencerContext<network::Memory, NoStorage>> {
1467 join_all((0..self.num_nodes()).map(|i| async move {
1468 self.init_node(
1469 i,
1470 ValidatedState::default(),
1471 no_storage::Options,
1472 Some(NullStateCatchup::default()),
1473 None,
1474 &NoMetrics,
1475 STAKE_TABLE_CAPACITY_FOR_TEST,
1476 NullEventConsumer,
1477 upgrade,
1478 Default::default(),
1479 )
1480 .await
1481 }))
1482 .await
1483 }
1484
1485 pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1486 &self.config.known_nodes_with_stake
1487 }
1488
1489 #[allow(clippy::too_many_arguments)]
1490 pub async fn init_node<P: PersistenceOptions>(
1491 &self,
1492 i: usize,
1493 mut state: ValidatedState,
1494 mut persistence_opt: P,
1495 state_peers: Option<impl StateCatchup + 'static>,
1496 storage: Option<RequestResponseStorage>,
1497 metrics: &dyn Metrics,
1498 stake_table_capacity: usize,
1499 event_consumer: impl EventConsumer + 'static,
1500 upgrade: versions::Upgrade,
1501 upgrades: BTreeMap<Version, Upgrade>,
1502 ) -> SequencerContext<network::Memory, P::Persistence> {
1503 let config = self.config.clone();
1504 let my_peer_config = &config.known_nodes_with_stake[i];
1505 let is_da = config.known_da_nodes.contains(my_peer_config);
1506
1507 let validator_config = ValidatorConfig {
1509 public_key: my_peer_config.stake_table_entry.stake_key,
1510 private_key: self.priv_keys[i].clone(),
1511 stake_value: my_peer_config.stake_table_entry.stake_amount,
1512 state_public_key: self.state_key_pairs[i].ver_key(),
1513 state_private_key: self.state_key_pairs[i].sign_key(),
1514 is_da,
1515 x25519_keypair: None,
1516 p2p_addr: None,
1517 };
1518
1519 let topics = if is_da {
1520 vec![Topic::Global, Topic::Da]
1521 } else {
1522 vec![Topic::Global]
1523 };
1524
1525 let network = Arc::new(MemoryNetwork::new(
1526 &my_peer_config.stake_table_entry.stake_key,
1527 &self.master_map,
1528 &topics,
1529 None,
1530 ));
1531
1532 let builder_account = Self::builder_key().fee_account();
1534 tracing::info!(%builder_account, "prefunding builder account");
1535 state.prefund_account(builder_account, U256::MAX.into());
1536
1537 let persistence = persistence_opt.create().await.unwrap();
1538
1539 let chain_config = state.chain_config.resolve().unwrap_or_default();
1540
1541 let catchup_providers = ParallelStateCatchup::new(&[], Duration::from_secs(5));
1543
1544 if let Some(state_peers) = state_peers {
1546 catchup_providers.add_provider(Arc::new(state_peers));
1547 }
1548
1549 match persistence
1551 .clone()
1552 .into_catchup_provider(BackoffParams::default())
1553 {
1554 Ok(local_catchup) => {
1555 catchup_providers.add_provider(local_catchup);
1556 },
1557 Err(e) => {
1558 tracing::warn!(
1559 "Failed to create local catchup provider: {e:#}. Only using remote \
1560 catchup."
1561 );
1562 },
1563 };
1564
1565 let l1_client = self
1566 .l1_opt
1567 .clone()
1568 .connect(vec![self.l1_url.clone()])
1569 .expect("failed to create L1 client");
1570 l1_client.spawn_tasks().await;
1571
1572 let fetcher = Fetcher::new(
1573 Arc::new(catchup_providers.clone()),
1574 Arc::new(Mutex::new(persistence.clone())),
1575 l1_client.clone(),
1576 chain_config,
1577 );
1578 fetcher.spawn_update_loop().await;
1579
1580 let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1581 let mut membership = EpochCommittees::new_stake(
1582 config.known_nodes_with_stake.clone(),
1583 config.known_da_nodes.clone(),
1584 block_reward,
1585 fetcher,
1586 config.epoch_height,
1587 );
1588 membership.reload_stake(50).await;
1589
1590 let membership = Arc::new(membership);
1591 let persistence = Arc::new(persistence);
1592
1593 let coordinator = EpochMembershipCoordinator::new(
1594 membership,
1595 config.epoch_height,
1596 &persistence.clone(),
1597 );
1598
1599 let node_state = NodeState::new(
1600 i as u64,
1601 chain_config,
1602 l1_client,
1603 Arc::new(catchup_providers.clone()),
1604 upgrade.base,
1605 coordinator.clone(),
1606 upgrade.base,
1607 )
1608 .with_current_version(upgrade.base)
1609 .with_genesis(state)
1610 .with_epoch_height(config.epoch_height)
1611 .with_upgrades(upgrades)
1612 .with_epoch_start_block(config.epoch_start_block);
1613
1614 tracing::info!(
1615 i,
1616 key = %my_peer_config.stake_table_entry.stake_key,
1617 state_key = %my_peer_config.state_ver_key,
1618 "starting node",
1619 );
1620
1621 let coordinator_network = {
1622 let keypair = x25519::Keypair::derive_from::<PubKey>(&self.priv_keys[i])
1623 .expect("keypair derivation should succeed");
1624 let port = test_utils::reserve_tcp_port()
1625 .expect("OS should have ephemeral ports available");
1626 let addr = NetAddr::Inet(Ipv4Addr::LOCALHOST.into(), port);
1627 let lock = UpgradeLock::<SeqTypes>::new(upgrade);
1628 Cliquenet::create(
1629 "test-coordinator",
1630 my_peer_config.stake_table_entry.stake_key,
1631 keypair,
1632 addr,
1633 [],
1634 lock,
1635 Box::new(NoMetrics),
1636 )
1637 .await
1638 .expect("cliquenet creation should succeed")
1639 };
1640
1641 SequencerContext::init(
1642 NetworkConfig {
1643 config,
1644 ..Default::default()
1647 },
1648 upgrade,
1649 validator_config,
1650 coordinator,
1651 node_state,
1652 storage,
1653 catchup_providers,
1654 persistence,
1655 network,
1656 coordinator_network,
1657 self.state_relay_url.clone(),
1658 metrics,
1659 stake_table_capacity,
1660 event_consumer,
1661 Default::default(),
1662 Duration::from_secs(2),
1663 NonZeroU64::new(100).unwrap(),
1664 )
1665 .await
1666 .unwrap()
1667 }
1668
1669 pub fn builder_key() -> EthKeyPair {
1670 FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1671 }
1672 }
1673
1674 pub async fn wait_for_decide_on_handle(
1677 events: &mut (impl Stream<Item = CoordinatorEvent<SeqTypes>> + Unpin),
1678 submitted_txn: &Transaction,
1679 ) -> (u64, usize) {
1680 let commitment = submitted_txn.commit();
1681
1682 loop {
1684 let event = events.next().await.unwrap();
1685 tracing::info!("Received event from handle: {event:?}");
1686
1687 if let CoordinatorEvent::LegacyEvent(Event {
1688 event: EventType::Decide { leaf_chain, .. },
1689 ..
1690 }) = event
1691 {
1692 if let Some((height, size)) =
1693 leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1694 if leaf
1695 .block_payload()
1696 .as_ref()?
1697 .transaction_commitments(leaf.block_header().metadata())
1698 .contains(&commitment)
1699 {
1700 let size = leaf.block_payload().unwrap().encode().len();
1701 Some((leaf.block_header().block_number(), size))
1702 } else {
1703 None
1704 }
1705 })
1706 {
1707 tracing::info!(height, "transaction {commitment} sequenced");
1708 return (height, size);
1709 }
1710 } else {
1711 }
1713 }
1714 }
1715
1716 pub async fn wait_for_epochs(
1719 events: &mut (impl futures::Stream<Item = CoordinatorEvent<SeqTypes>> + std::marker::Unpin),
1720 epoch_height: u64,
1721 target_epoch: u64,
1722 ) {
1723 tracing::info!(target_epoch, "waiting for epoch");
1724 while let Some(event) = events.next().await {
1725 if let CoordinatorEvent::LegacyEvent(Event {
1726 event: EventType::Decide { leaf_chain, .. },
1727 ..
1728 }) = event
1729 {
1730 let leaf = leaf_chain[0].leaf.clone();
1731 let epoch = leaf.epoch(epoch_height);
1732 tracing::debug!(
1733 "Node decided at height: {}, epoch: {epoch:?}",
1734 leaf.height(),
1735 );
1736
1737 if epoch > Some(EpochNumber::new(target_epoch)) {
1738 break;
1739 }
1740 }
1741 }
1742 tracing::info!(target_epoch, "epoch started");
1743 }
1744}
1745
1746#[cfg(test)]
1747mod test {
1748 use alloy::node_bindings::Anvil;
1749 use espresso_types::{Header, MOCK_SEQUENCER_VERSIONS, NamespaceId, Payload, Transaction};
1750 use futures::StreamExt;
1751 use hotshot::types::{Event, EventType};
1752 use hotshot_example_types::node_types::TEST_VERSIONS;
1753 use hotshot_types::{
1754 event::LeafInfo,
1755 new_protocol::CoordinatorEvent,
1756 traits::block_contents::{BlockHeader, BlockPayload},
1757 };
1758 use testing::{TestConfigBuilder, wait_for_decide_on_handle};
1759
1760 use self::testing::run_test_builder;
1761 use super::*;
1762
1763 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1764 async fn test_skeleton_instantiation() {
1765 let anvil = Anvil::new().spawn();
1767 let url = anvil.endpoint_url();
1768 const NUM_NODES: usize = 5;
1769 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1770 .l1_url(url)
1771 .build();
1772
1773 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1774
1775 config.set_builder_urls(vec1::vec1![builder_url]);
1776
1777 let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1778
1779 let handle_0 = &handles[0];
1780
1781 builder_task.start(Box::new(
1783 handle_0
1784 .consensus_handle()
1785 .legacy_consensus()
1786 .read()
1787 .await
1788 .event_stream(),
1789 ));
1790
1791 let mut events = handle_0.event_stream();
1792
1793 for handle in handles.iter() {
1794 handle.start_consensus().await;
1795 }
1796
1797 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1799 handles[0]
1800 .submit_transaction(txn.clone())
1801 .await
1802 .expect("Failed to submit transaction");
1803 tracing::info!("Submitted transaction to handle: {txn:?}");
1804
1805 wait_for_decide_on_handle(&mut events, &txn).await;
1806 }
1807
1808 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1809 async fn test_header_invariants() {
1810 let success_height = 30;
1811 let anvil = Anvil::new().spawn();
1813 let url = anvil.endpoint_url();
1814 const NUM_NODES: usize = 5;
1815 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1816 .l1_url(url)
1817 .build();
1818
1819 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1820
1821 config.set_builder_urls(vec1::vec1![builder_url]);
1822 let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1823
1824 let handle_0 = &handles[0];
1825
1826 let mut events = handle_0.event_stream();
1827
1828 builder_task.start(Box::new(
1830 handle_0
1831 .consensus_handle()
1832 .legacy_consensus()
1833 .read()
1834 .await
1835 .event_stream(),
1836 ));
1837
1838 for handle in handles.iter() {
1839 handle.start_consensus().await;
1840 }
1841
1842 let mut parent = {
1843 let (genesis_payload, genesis_ns_table) =
1845 Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1846 .await
1847 .unwrap();
1848
1849 let genesis_state = NodeState::mock();
1850 Header::genesis(
1851 &genesis_state,
1852 genesis_payload,
1853 &genesis_ns_table,
1854 TEST_VERSIONS.test.base,
1855 )
1856 };
1857
1858 loop {
1859 let event = events.next().await.unwrap();
1860 tracing::info!("Received event from handle: {event:?}");
1861 let CoordinatorEvent::LegacyEvent(Event {
1862 event: EventType::Decide { leaf_chain, .. },
1863 ..
1864 }) = event
1865 else {
1866 continue;
1867 };
1868 tracing::info!("Got decide {leaf_chain:?}");
1869
1870 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1873 let header = leaf.block_header().clone();
1874 if header.height() == 0 {
1875 parent = header;
1876 continue;
1877 }
1878 assert_eq!(header.height(), parent.height() + 1);
1879 assert!(header.timestamp() >= parent.timestamp());
1880 assert!(header.l1_head() >= parent.l1_head());
1881 assert!(header.l1_finalized() >= parent.l1_finalized());
1882 parent = header;
1883 }
1884
1885 if parent.height() >= success_height {
1886 break;
1887 }
1888 }
1889 }
1890}