1mod external_event_handler;
2mod message_compat_tests;
3mod proposal_fetcher;
4mod request_response;
5
6pub mod api;
7pub mod catchup;
8pub mod context;
9pub mod genesis;
10pub mod keyset;
11pub mod network;
12pub mod options;
13pub mod persistence;
14pub mod run;
15pub mod state;
16pub mod state_cert;
17pub mod state_signature;
18pub mod util;
19
20use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration};
21
22use alloy::primitives::U256;
23use anyhow::Context;
24use async_lock::{Mutex, RwLock};
25use catchup::{ParallelStateCatchup, StatePeers};
26use context::SequencerContext;
27use derivative::Derivative;
28use espresso_types::{
29 BackoffParams, EpochCommittees, EpochRewardsCalculator, L1ClientOptions, NodeState, PubKey,
30 SeqTypes, ValidatedState,
31 traits::{EventConsumer, MembershipPersistence},
32 v0::traits::SequencerPersistence,
33 v0_3::Fetcher,
34};
35pub use genesis::Genesis;
36use genesis::L1Finalized;
37use hotshot::{
38 traits::implementations::{
39 CdnMetricsValue, CdnTopic, Cliquenet, CombinedNetworks, CompatNetwork, GossipConfig,
40 KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork, RequestResponseConfig,
41 WrappedSignatureKey, derive_libp2p_multiaddr, derive_libp2p_peer_id,
42 },
43 types::SignatureKey,
44};
45use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
46use hotshot_orchestrator::client::{OrchestratorClient, get_complete_config};
47use hotshot_types::{
48 ValidatorConfig,
49 addr::NetAddr,
50 data::ViewNumber,
51 epoch_membership::EpochMembershipCoordinator,
52 light_client::{StateKeyPair, StateSignKey},
53 signature_key::{BLSPrivKey, BLSPubKey},
54 traits::{
55 metrics::{Metrics, NoMetrics},
56 network::ConnectedNetwork,
57 node_implementation::{NodeImplementation, NodeType},
58 storage::Storage,
59 },
60 utils::BuilderCommitment,
61 x25519,
62};
63use libp2p::Multiaddr;
64use moka::future::Cache;
65use network::libp2p::split_off_peer_id;
66use options::Identity;
67pub use options::Options;
68use proposal_fetcher::ProposalFetcherConfig;
69pub use run::main;
70use serde::{Deserialize, Serialize};
71use tokio::select;
72use tracing::info;
73use url::Url;
74use vbs::version::StaticVersion;
75
76use crate::request_response::data_source::Storage as RequestResponseStorage;
77
78pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
79
80#[derive(Derivative, Serialize, Deserialize)]
82#[derivative(
83 Copy(bound = ""),
84 Debug(bound = ""),
85 Default(bound = ""),
86 PartialEq(bound = ""),
87 Eq(bound = ""),
88 Hash(bound = "")
89)]
90pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
91
92impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
95 fn clone(&self) -> Self {
96 *self
97 }
98}
99
100pub type SequencerApiVersion = StaticVersion<0, 1>;
101
102impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
103 for Node<N, P>
104{
105 type Network = N;
106 type Storage = Arc<P>;
107}
108
109#[derive(Clone, Debug)]
110pub struct NetworkParams {
111 pub cdn_endpoint: String,
113 pub orchestrator_url: Url,
114 pub state_relay_server_url: Url,
115
116 pub builder_urls: Vec<Url>,
118
119 pub private_staking_key: BLSPrivKey,
120 pub private_state_key: StateSignKey,
121 pub state_peers: Vec<Url>,
122 pub config_peers: Option<Vec<Url>>,
123 pub catchup_backoff: BackoffParams,
124 pub catchup_base_timeout: Duration,
126 pub local_catchup_timeout: Duration,
128 pub public_api_url: Option<Url>,
130 pub cliquenet_bind_addr: NetAddr,
132 pub x25519_secret_key: x25519::SecretKey,
134 pub libp2p_advertise_address: String,
136 pub libp2p_bind_address: String,
138 pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
141
142 pub libp2p_heartbeat_interval: Duration,
144
145 pub libp2p_history_gossip: usize,
147 pub libp2p_history_length: usize,
149
150 pub libp2p_mesh_n: usize,
152 pub libp2p_mesh_n_high: usize,
154 pub libp2p_mesh_n_low: usize,
156 pub libp2p_mesh_outbound_min: usize,
158
159 pub libp2p_max_gossip_transmit_size: usize,
161
162 pub libp2p_max_direct_transmit_size: u64,
164
165 pub libp2p_max_ihave_length: usize,
167
168 pub libp2p_max_ihave_messages: usize,
170
171 pub libp2p_published_message_ids_cache_time: Duration,
173
174 pub libp2p_iwant_followup_time: Duration,
176
177 pub libp2p_max_messages_per_rpc: Option<usize>,
179
180 pub libp2p_gossip_retransmission: u32,
182
183 pub libp2p_flood_publish: bool,
185
186 pub libp2p_duplicate_cache_time: Duration,
188
189 pub libp2p_fanout_ttl: Duration,
191
192 pub libp2p_heartbeat_initial_delay: Duration,
194
195 pub libp2p_gossip_factor: f64,
197
198 pub libp2p_gossip_lazy: usize,
200}
201
202pub struct L1Params {
203 pub urls: Vec<Url>,
204 pub options: L1ClientOptions,
205}
206
207#[allow(clippy::too_many_arguments)]
208pub async fn init_node<P>(
209 genesis: Genesis,
210 network_params: NetworkParams,
211 metrics: Box<dyn Metrics>,
212 mut persistence: P,
213 l1_params: L1Params,
214 storage: Option<RequestResponseStorage>,
215 event_consumer: impl EventConsumer + 'static,
216 is_da: bool,
217 identity: Identity,
218 proposal_fetcher_config: ProposalFetcherConfig,
219) -> anyhow::Result<SequencerContext<network::Production, P>>
220where
221 P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
222 Arc<P>: Storage<SeqTypes>,
223{
224 let info = espresso_utils::build_info!();
226 metrics
227 .text_family(
228 "version".into(),
229 vec!["rev".into(), "desc".into(), "timestamp".into()],
230 )
231 .create(vec![
232 info.git_sha.into(),
233 info.git_describe.into(),
234 info.git_commit_timestamp.into(),
235 ]);
236
237 metrics
238 .text_family(
239 "build_info".into(),
240 vec![
241 "modified".into(),
242 "branch".into(),
243 "debug".into(),
244 "features".into(),
245 ],
246 )
247 .create(vec![
248 info.git_dirty.into(),
249 info.git_branch.into(),
250 info.is_debug.to_string(),
251 env!("VERGEN_CARGO_FEATURES").into(),
252 ]);
253
254 metrics
256 .text_family(
257 "node_identity_general".into(),
258 vec![
259 "name".into(),
260 "description".into(),
261 "company_name".into(),
262 "company_website".into(),
263 "operating_system".into(),
264 "node_type".into(),
265 "network_type".into(),
266 ],
267 )
268 .create(vec![
269 identity.node_name.unwrap_or_default(),
270 identity.node_description.unwrap_or_default(),
271 identity.company_name.unwrap_or_default(),
272 identity
273 .company_website
274 .map(|u| u.into())
275 .unwrap_or_default(),
276 identity.operating_system.unwrap_or_default(),
277 identity.node_type.unwrap_or_default(),
278 identity.network_type.unwrap_or_default(),
279 ]);
280
281 metrics
283 .text_family(
284 "node_identity_location".into(),
285 vec!["country".into(), "latitude".into(), "longitude".into()],
286 )
287 .create(vec![
288 identity.country_code.unwrap_or_default(),
289 identity.latitude.map(|l| l.to_string()).unwrap_or_default(),
290 identity
291 .longitude
292 .map(|l| l.to_string())
293 .unwrap_or_default(),
294 ]);
295
296 metrics
298 .text_family(
299 "node_identity_icon".into(),
300 vec![
301 "small_1x".into(),
302 "small_2x".into(),
303 "small_3x".into(),
304 "large_1x".into(),
305 "large_2x".into(),
306 "large_3x".into(),
307 ],
308 )
309 .create(vec![
310 identity
311 .icon_14x14_1x
312 .map(|u| u.to_string())
313 .unwrap_or_default(),
314 identity
315 .icon_14x14_2x
316 .map(|u| u.to_string())
317 .unwrap_or_default(),
318 identity
319 .icon_14x14_3x
320 .map(|u| u.to_string())
321 .unwrap_or_default(),
322 identity
323 .icon_24x24_1x
324 .map(|u| u.to_string())
325 .unwrap_or_default(),
326 identity
327 .icon_24x24_2x
328 .map(|u| u.to_string())
329 .unwrap_or_default(),
330 identity
331 .icon_24x24_3x
332 .map(|u| u.to_string())
333 .unwrap_or_default(),
334 ]);
335
336 let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
338 metrics
339 .text_family("node".into(), vec!["key".into()])
340 .create(vec![pub_key.to_string()]);
341
342 let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
344 .with_context(|| {
345 format!(
346 "Failed to derive Libp2p bind address of {}",
347 &network_params.libp2p_bind_address
348 )
349 })?;
350 let libp2p_advertise_address =
351 derive_libp2p_multiaddr(&network_params.libp2p_advertise_address).with_context(|| {
352 format!(
353 "Failed to derive Libp2p advertise address of {}",
354 &network_params.libp2p_advertise_address
355 )
356 })?;
357
358 info!("Libp2p bind address: {}", libp2p_bind_address);
359 info!("Libp2p advertise address: {}", libp2p_advertise_address);
360
361 let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
363 let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
364 let validator_config = ValidatorConfig {
365 public_key: pub_key,
366 private_key: network_params.private_staking_key,
367 stake_value: U256::ONE,
368 state_public_key: state_key_pair.ver_key(),
369 state_private_key: state_key_pair.sign_key(),
370 is_da,
371 x25519_keypair: Some(x25519::Keypair::from(&network_params.x25519_secret_key)),
372 p2p_addr: Some(network_params.cliquenet_bind_addr.clone()),
373 };
374
375 let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
377 &validator_config.private_key,
378 )
379 .with_context(|| "Failed to derive Libp2p peer ID")?;
380
381 info!("Starting Libp2p with PeerID: {libp2p_public_key}");
383
384 let loaded_network_config_from_persistence = persistence.load_config().await?;
385 let (mut network_config, wait_for_orchestrator) = match (
386 loaded_network_config_from_persistence,
387 network_params.config_peers,
388 ) {
389 (Some(config), _) => {
390 tracing::warn!("loaded network config from storage, rejoining existing network");
391 (config, false)
392 },
393 (None, Some(peers)) => {
395 tracing::warn!(?peers, "loading network config from peers");
396 let peers = StatePeers::<SequencerApiVersion>::from_urls(
397 peers,
398 network_params.catchup_backoff,
399 network_params.catchup_base_timeout,
400 &NoMetrics,
401 );
402 let config = peers.fetch_config(validator_config.clone()).await?;
403
404 tracing::warn!(
405 node_id = config.node_index,
406 stake_table = ?config.config.known_nodes_with_stake,
407 "loaded config",
408 );
409 persistence.save_config(&config).await?;
410 (config, false)
411 },
412 (None, None) => {
414 tracing::warn!("loading network config from orchestrator");
415 tracing::warn!(
416 "waiting for other nodes to connect, DO NOT RESTART until fully connected"
417 );
418 let config = get_complete_config(
419 &orchestrator_client,
420 validator_config.clone(),
421 Some(libp2p_advertise_address),
424 Some(libp2p_public_key),
425 )
426 .await?
427 .0;
428
429 tracing::warn!(
430 node_id = config.node_index,
431 stake_table = ?config.config.known_nodes_with_stake,
432 "loaded config",
433 );
434 persistence.save_config(&config).await?;
435 tracing::warn!("all nodes connected");
436 (config, true)
437 },
438 };
439
440 if let Some(upgrade) = genesis.upgrades.get(&genesis.upgrade_version) {
441 upgrade.set_hotshot_config_parameters(&mut network_config.config);
442 }
443
444 if !network_params.builder_urls.is_empty() {
447 network_config.config.builder_urls = network_params.builder_urls.try_into().unwrap();
448 }
449
450 let epoch_height = genesis.epoch_height.unwrap_or_default();
451 let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
452 let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
453 let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
454 let stake_table_capacity = genesis
455 .stake_table_capacity
456 .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
457
458 let version_upgrade = versions::Upgrade::new(genesis.base_version, genesis.upgrade_version);
459
460 tracing::warn!("setting epoch_height={epoch_height:?}");
461 tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
462 tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
463 tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
464 tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
465 tracing::warn!("setting version_upgrade={version_upgrade}");
466 network_config.config.epoch_height = epoch_height;
467 network_config.config.drb_difficulty = drb_difficulty;
468 network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
469 network_config.config.epoch_start_block = epoch_start_block;
470 network_config.config.stake_table_capacity = stake_table_capacity;
471
472 if let Some(da_committees) = &genesis.da_committees {
473 tracing::warn!("setting da_committees from genesis: {da_committees:?}");
474 network_config.config.da_committees = da_committees.clone();
475 }
476
477 if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
480 if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
481 libp2p_config.bootstrap_nodes = bootstrap_nodes
485 .into_iter()
486 .map(split_off_peer_id)
487 .collect::<Result<Vec<_>, _>>()
488 .with_context(|| "Failed to parse peer ID from bootstrap node")?;
489 } else {
490 tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
493 }
494 }
495
496 let node_index = network_config.node_index;
497
498 let topics = {
500 let mut topics = vec![CdnTopic::Global];
501 if is_da {
502 topics.push(CdnTopic::Da);
503 }
504 topics
505 };
506
507 let cdn_network = PushCdnNetwork::new(
509 network_params.cdn_endpoint,
510 topics,
511 KeyPair {
512 public_key: WrappedSignatureKey(validator_config.public_key),
513 private_key: validator_config.private_key.clone(),
514 },
515 CdnMetricsValue::new(&*metrics),
516 )
517 .with_context(|| format!("Failed to create CDN network {node_index}"))?;
518
519 let gossip_config = GossipConfig {
521 heartbeat_interval: network_params.libp2p_heartbeat_interval,
522 history_gossip: network_params.libp2p_history_gossip,
523 history_length: network_params.libp2p_history_length,
524 mesh_n: network_params.libp2p_mesh_n,
525 mesh_n_high: network_params.libp2p_mesh_n_high,
526 mesh_n_low: network_params.libp2p_mesh_n_low,
527 mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
528 max_ihave_messages: network_params.libp2p_max_ihave_messages,
529 max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
530 max_ihave_length: network_params.libp2p_max_ihave_length,
531 published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
532 iwant_followup_time: network_params.libp2p_iwant_followup_time,
533 max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
534 gossip_retransmission: network_params.libp2p_gossip_retransmission,
535 flood_publish: network_params.libp2p_flood_publish,
536 duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
537 fanout_ttl: network_params.libp2p_fanout_ttl,
538 heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
539 gossip_factor: network_params.libp2p_gossip_factor,
540 gossip_lazy: network_params.libp2p_gossip_lazy,
541 };
542
543 let request_response_config = RequestResponseConfig {
545 request_size_maximum: network_params.libp2p_max_direct_transmit_size,
546 response_size_maximum: network_params.libp2p_max_direct_transmit_size,
547 };
548
549 let l1_client = l1_params
550 .options
551 .with_metrics(&*metrics)
552 .connect(l1_params.urls)
553 .with_context(|| "failed to create L1 client")?;
554
555 info!("Validating fee contract");
556
557 genesis.validate_fee_contract(&l1_client).await?;
558
559 info!("Fee contract validated. Spawning L1 tasks");
560
561 l1_client.spawn_tasks().await;
562
563 info!(
564 "L1 tasks spawned. Waiting for L1 genesis: {:?}",
565 genesis.l1_finalized
566 );
567
568 let l1_genesis = match genesis.l1_finalized {
569 L1Finalized::Block(b) => b,
570 L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
571 L1Finalized::Timestamp { timestamp } => {
572 l1_client
573 .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
574 .await
575 },
576 };
577
578 info!("L1 genesis found: {:?}", l1_genesis);
579
580 let genesis_chain_config = genesis.header.chain_config;
581 let mut genesis_state = ValidatedState {
582 chain_config: genesis_chain_config.into(),
583 ..Default::default()
584 };
585 for (address, amount) in genesis.accounts {
586 tracing::warn!(%address, %amount, "Prefunding account for demo");
587 genesis_state.prefund_account(address, amount);
588 }
589
590 let state_catchup_providers =
592 ParallelStateCatchup::new(&[], network_params.local_catchup_timeout);
593
594 let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
596 network_params.state_peers,
597 network_params.catchup_backoff,
598 network_params.catchup_base_timeout,
599 &*metrics,
600 );
601 state_catchup_providers.add_provider(Arc::new(state_peers));
602
603 match persistence
605 .clone()
606 .into_catchup_provider(network_params.catchup_backoff)
607 {
608 Ok(catchup) => {
609 state_catchup_providers.add_provider(Arc::new(catchup));
610 },
611 Err(e) => {
612 tracing::warn!(
613 "Failed to create local catchup provider: {e:#}. Only using remote catchup."
614 );
615 },
616 };
617
618 persistence.enable_metrics(&*metrics);
619
620 let fetcher = Fetcher::new(
621 Arc::new(state_catchup_providers.clone()),
622 Arc::new(Mutex::new(persistence.clone())),
623 l1_client.clone(),
624 genesis.chain_config,
625 );
626
627 info!("Spawning update loop");
628
629 fetcher.spawn_update_loop().await;
630 info!("Update loop spawned. Fetching block reward");
631
632 let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
633 info!("Block reward fetched: {:?}", block_reward);
634 let mut membership = EpochCommittees::new_stake(
636 network_config.config.known_nodes_with_stake.clone(),
637 network_config.config.known_da_nodes.clone(),
638 block_reward,
639 fetcher,
640 epoch_height,
641 );
642 info!("Membership created. Reloading stake");
643 membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
644 info!("Stake reloaded");
645
646 let membership: Arc<RwLock<EpochCommittees>> = Arc::new(RwLock::new(membership));
647 let persistence = Arc::new(persistence);
648 let coordinator = EpochMembershipCoordinator::new(
649 membership,
650 network_config.config.epoch_height,
651 &persistence.clone(),
652 );
653
654 let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
655
656 let instance_state = NodeState {
657 chain_config: genesis.chain_config,
658 genesis_chain_config,
659 l1_client,
660 genesis_header: genesis.header,
661 genesis_state,
662 l1_genesis: Some(l1_genesis),
663 node_id: node_index,
664 upgrades: genesis.upgrades,
665 current_version: genesis.base_version,
666 epoch_height: Some(epoch_height),
667 state_catchup: Arc::new(state_catchup_providers.clone()),
668 coordinator: coordinator.clone(),
669 genesis_version: genesis.genesis_version,
670 epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
671 epoch_rewards_calculator,
672 light_client_contract_address: Cache::builder().max_capacity(1).build(),
673 token_contract_address: Cache::builder().max_capacity(1).build(),
674 finalized_hotshot_height: Cache::builder()
675 .max_capacity(1)
676 .time_to_live(Duration::from_secs(30))
677 .build(),
678 };
679
680 let combined_network = {
681 info!("Initializing Libp2p network");
682 let p2p_network = Libp2pNetwork::from_config(
683 network_config.clone(),
684 persistence.clone(),
685 gossip_config,
686 request_response_config,
687 libp2p_bind_address,
688 &validator_config.public_key,
689 &validator_config.private_key,
692 hotshot::traits::implementations::Libp2pMetricsValue::new(&*metrics),
693 )
694 .await
695 .with_context(|| {
696 format!(
697 "Failed to create libp2p network on node {node_index}; binding to {:?}",
698 network_params.libp2p_bind_address
699 )
700 })?;
701
702 info!("Libp2p network initialized");
703
704 tracing::warn!("Waiting for at least one connection to be initialized");
705 select! {
706 _ = cdn_network.wait_for_ready() => {
707 tracing::warn!("CDN connection initialized");
708 },
709 _ = p2p_network.wait_for_ready() => {
710 tracing::warn!("P2P connection initialized");
711 },
712 };
713
714 CombinedNetworks::new(cdn_network, p2p_network, Some(Duration::from_secs(1)))
716 };
717
718 let network = {
719 let peers = coordinator
720 .stake_table_for_epoch(None)
721 .await?
722 .stake_table()
723 .await
724 .0
725 .into_iter()
726 .filter_map(|cfg| Some((cfg.stake_table_entry.stake_key, cfg.connect_info?)));
727
728 let c = Cliquenet::<PubKey>::create(
729 "sequencer",
730 pub_key,
731 network_params.x25519_secret_key.into(),
732 network_params.cliquenet_bind_addr.clone(),
733 peers,
734 metrics.clone(),
735 )
736 .await?;
737
738 Arc::new(CompatNetwork::new(c, combined_network).await)
739 };
740
741 let mut ctx = SequencerContext::init(
742 network_config,
743 version_upgrade,
744 validator_config,
745 coordinator,
746 instance_state,
747 storage,
748 state_catchup_providers,
749 persistence,
750 network.clone(),
751 Some(network_params.state_relay_server_url),
752 &*metrics,
753 genesis.stake_table.capacity,
754 event_consumer,
755 proposal_fetcher_config,
756 )
757 .await?;
758
759 network.set_upgrade_lock(ctx.upgrade_lock().await);
760
761 if wait_for_orchestrator {
762 ctx = ctx.wait_for_orchestrator(orchestrator_client);
763 }
764
765 Ok(ctx)
766}
767
768pub fn empty_builder_commitment() -> BuilderCommitment {
769 BuilderCommitment::from_bytes([])
770}
771
772#[cfg(any(test, feature = "testing"))]
773pub mod testing {
774 use std::{
775 cmp::max,
776 collections::{BTreeMap, HashMap},
777 time::Duration,
778 };
779
780 use alloy::{
781 network::EthereumWallet,
782 node_bindings::{Anvil, AnvilInstance},
783 primitives::{Address, U256},
784 providers::{
785 Provider, ProviderBuilder, RootProvider,
786 fillers::{
787 BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
788 },
789 layers::AnvilProvider,
790 },
791 signers::{
792 k256::ecdsa::SigningKey,
793 local::{LocalSigner, PrivateKeySigner},
794 },
795 };
796 use async_lock::RwLock;
797 use catchup::NullStateCatchup;
798 use committable::Committable;
799 use espresso_contract_deployer::{
800 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
801 network_config::light_client_genesis_from_stake_table,
802 };
803 use espresso_types::{
804 EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
805 Upgrade, UpgradeMap,
806 eth_signature_key::EthKeyPair,
807 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
808 };
809 use futures::{
810 future::join_all,
811 stream::{Stream, StreamExt},
812 };
813 use hotshot::{
814 traits::{
815 BlockPayload,
816 implementations::{MasterMap, MemoryNetwork},
817 },
818 types::EventType::{self, Decide},
819 };
820 use hotshot_builder_refactored::service::{
821 BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
822 };
823 use hotshot_testing::block_builder::{
824 BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
825 };
826 use hotshot_types::{
827 HotShotConfig, PeerConfig,
828 data::EpochNumber,
829 event::LeafInfo,
830 light_client::StateKeyPair,
831 signature_key::BLSKeyPair,
832 traits::{
833 EncodeBytes, block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
834 signature_key::BuilderSignatureKey,
835 },
836 };
837 use rand::SeedableRng as _;
838 use rand_chacha::ChaCha20Rng;
839 use staking_cli::demo::{DelegationConfig, StakingTransactions};
840 use test_utils::reserve_tcp_port;
841 use tokio::spawn;
842 use vbs::version::{StaticVersionType, Version};
843 use versions::EPOCH_VERSION;
844
845 use super::*;
846 use crate::{
847 catchup::ParallelStateCatchup,
848 persistence::no_storage::{self, NoStorage},
849 };
850
851 const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
852 const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
853 type AnvilFillProvider = AnvilProvider<
854 FillProvider<
855 JoinFill<
856 alloy::providers::Identity,
857 JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
858 >,
859 RootProvider,
860 >,
861 >;
862 struct LegacyBuilderImplementation {
863 global_state: Arc<LegacyGlobalState<SeqTypes>>,
864 }
865
866 impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
867 fn start(
868 self: Box<Self>,
869 stream: Box<
870 dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
871 + std::marker::Unpin
872 + Send
873 + 'static,
874 >,
875 ) {
876 spawn(async move {
877 let res = self.global_state.start_event_loop(stream).await;
878 tracing::error!(?res, "testing legacy builder service exited");
879 });
880 }
881 }
882
883 pub async fn run_legacy_builder<const NUM_NODES: usize>(
884 port: Option<u16>,
885 max_block_size: Option<u64>,
886 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
887 let builder_key_pair = TestConfig::<0>::builder_key();
888 let port = match port {
889 Some(p) => p,
890 None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
891 };
892
893 let url: Url = format!("http://localhost:{port}")
895 .parse()
896 .expect("Failed to parse builder URL");
897
898 let global_state = LegacyGlobalState::new(
900 LegacyBuilderConfig {
901 builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
902 max_api_waiting_time: Duration::from_secs(1),
903 max_block_size_increment_period: Duration::from_secs(60),
904 maximize_txn_capture_timeout: Duration::from_millis(100),
905 txn_garbage_collect_duration: Duration::from_secs(60),
906 txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
907 tx_status_cache_capacity: 81920,
908 base_fee: 10,
909 },
910 NodeState::default(),
911 max_block_size.unwrap_or(300),
912 NUM_NODES,
913 );
914
915 let app = Arc::clone(&global_state)
917 .into_app()
918 .expect("Failed to create builder tide-disco app");
919
920 spawn(async move {
921 app.serve(
922 format!("http://0.0.0.0:{port}")
923 .parse::<Url>()
924 .expect("Failed to parse builder listener"),
925 EpochVersion::instance(),
926 )
927 .await
928 });
929
930 (Box::new(LegacyBuilderImplementation { global_state }), url)
932 }
933
934 pub async fn run_test_builder<const NUM_NODES: usize>(
935 port: Option<u16>,
936 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
937 let port = match port {
938 Some(p) => p,
939 None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
940 };
941
942 let url: Url = format!("http://localhost:{port}")
944 .parse()
945 .expect("Failed to parse builder URL");
946 tracing::info!("Starting test builder on {url}");
947
948 let task = <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
949 NUM_NODES,
950 format!("http://0.0.0.0:{port}")
951 .parse()
952 .expect("Failed to parse builder listener"),
953 (),
954 HashMap::new(),
955 )
956 .await;
957
958 (task, url)
959 }
960
961 pub struct TestConfigBuilder<const NUM_NODES: usize> {
962 config: HotShotConfig<SeqTypes>,
963 priv_keys: Vec<BLSPrivKey>,
964 state_key_pairs: Vec<StateKeyPair>,
965 master_map: Arc<MasterMap<PubKey>>,
966 l1_url: Url,
967 l1_opt: L1ClientOptions,
968 anvil_provider: Option<AnvilFillProvider>,
969 signer: LocalSigner<SigningKey>,
970 state_relay_url: Option<Url>,
971 builder_port: Option<u16>,
972 upgrades: BTreeMap<Version, Upgrade>,
973 }
974
975 pub fn staking_priv_keys(
976 priv_keys: &[BLSPrivKey],
977 state_key_pairs: &[StateKeyPair],
978 num_nodes: usize,
979 ) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
980 let seed = [42u8; 32];
981 let mut rng = ChaCha20Rng::from_seed(seed); let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
983 eth_key_pairs
984 .zip(priv_keys.iter())
985 .zip(state_key_pairs.iter())
986 .map(|((eth, bls), state)| (eth, bls.clone().into(), state.clone()))
987 .collect()
988 }
989
990 impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
991 pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
992 self.builder_port = builder_port;
993 self
994 }
995
996 pub fn state_relay_url(mut self, url: Url) -> Self {
997 self.state_relay_url = Some(url);
998 self
999 }
1000
1001 pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
1006 self.l1_url = anvil.endpoint().parse().unwrap();
1007 let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
1008 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1009 self.anvil_provider = Some(anvil_provider);
1010 self
1011 }
1012
1013 pub fn l1_url(mut self, l1_url: Url) -> Self {
1016 self.anvil_provider = None;
1017 self.l1_url = l1_url;
1018 self
1019 }
1020
1021 pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
1022 self.l1_opt = opt;
1023 self
1024 }
1025
1026 pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
1027 self.signer = signer;
1028 self
1029 }
1030
1031 pub fn upgrades(mut self, v: Version, upgrades: BTreeMap<Version, Upgrade>) -> Self {
1032 let upgrade = upgrades.get(&v).unwrap();
1033 upgrade.set_hotshot_config_parameters(&mut self.config);
1034 self.upgrades = upgrades;
1035 self
1036 }
1037
1038 pub async fn set_upgrades(mut self, version: Version) -> Self {
1041 let upgrade = match version {
1042 version if version >= EPOCH_VERSION => {
1043 tracing::debug!(?version, "upgrade version");
1044 let blocks_per_epoch = self.config.epoch_height;
1045 let epoch_start_block = self.config.epoch_start_block;
1046
1047 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1048 &self.config.hotshot_stake_table(),
1049 STAKE_TABLE_CAPACITY_FOR_TEST,
1050 )
1051 .unwrap();
1052
1053 let validators =
1054 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
1055
1056 let deployer = ProviderBuilder::new()
1057 .wallet(EthereumWallet::from(self.signer.clone()))
1058 .connect_http(self.l1_url.clone());
1059
1060 let mut contracts = Contracts::new();
1061 let args = DeployerArgsBuilder::default()
1062 .deployer(deployer.clone())
1063 .rpc_url(self.l1_url.clone())
1064 .mock_light_client(true)
1065 .genesis_lc_state(genesis_state)
1066 .genesis_st_state(genesis_stake)
1067 .blocks_per_epoch(blocks_per_epoch)
1068 .epoch_start_block(epoch_start_block)
1069 .exit_escrow_period(U256::from(max(
1070 blocks_per_epoch * 15 + 100,
1071 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1072 )))
1073 .multisig_pauser(self.signer.address())
1074 .token_name("Espresso".to_string())
1075 .token_symbol("ESP".to_string())
1076 .initial_token_supply(U256::from(3590000000u64))
1077 .ops_timelock_delay(U256::from(0))
1078 .ops_timelock_admin(self.signer.address())
1079 .ops_timelock_proposers(vec![self.signer.address()])
1080 .ops_timelock_executors(vec![self.signer.address()])
1081 .safe_exit_timelock_delay(U256::from(10))
1082 .safe_exit_timelock_admin(self.signer.address())
1083 .safe_exit_timelock_proposers(vec![self.signer.address()])
1084 .safe_exit_timelock_executors(vec![self.signer.address()])
1085 .build()
1086 .unwrap();
1087 args.deploy_all(&mut contracts)
1088 .await
1089 .expect("failed to deploy all contracts");
1090
1091 let st_addr = contracts
1092 .address(Contract::StakeTableProxy)
1093 .expect("StakeTableProxy address not found");
1094 StakingTransactions::create(
1095 self.l1_url.clone(),
1096 &deployer,
1097 st_addr,
1098 validators,
1099 None,
1100 DelegationConfig::default(),
1101 )
1102 .await
1103 .expect("stake table setup failed")
1104 .apply_all()
1105 .await
1106 .expect("send all txns failed");
1107
1108 Upgrade::pos_view_based(st_addr)
1109 },
1110 _ => panic!("Upgrade not configured for version {version:?}"),
1111 };
1112
1113 let mut upgrades = std::collections::BTreeMap::new();
1114 upgrade.set_hotshot_config_parameters(&mut self.config);
1115 upgrades.insert(version, upgrade);
1116
1117 self.upgrades = upgrades;
1118 self
1119 }
1120
1121 pub fn epoch_height(mut self, epoch_height: u64) -> Self {
1122 self.config.epoch_height = epoch_height;
1123 self
1124 }
1125
1126 pub fn epoch_start_block(mut self, start_block: u64) -> Self {
1127 self.config.epoch_start_block = start_block;
1128 self
1129 }
1130
1131 pub fn build(self) -> TestConfig<NUM_NODES> {
1132 TestConfig {
1133 config: self.config,
1134 priv_keys: self.priv_keys,
1135 state_key_pairs: self.state_key_pairs,
1136 master_map: self.master_map,
1137 l1_url: self.l1_url,
1138 l1_opt: self.l1_opt,
1139 signer: self.signer,
1140 state_relay_url: self.state_relay_url,
1141 builder_port: self.builder_port,
1142 upgrades: self.upgrades,
1143 anvil_provider: self.anvil_provider,
1144 }
1145 }
1146
1147 pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
1148 self.config.stake_table_capacity = stake_table_capacity;
1149 self
1150 }
1151 }
1152
1153 impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
1154 fn default() -> Self {
1155 let num_nodes = NUM_NODES;
1156
1157 let seed = [0; 32];
1159 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1160 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1161 .unzip();
1162 let state_key_pairs = (0..num_nodes)
1163 .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1164 .collect::<Vec<_>>();
1165 let known_nodes_with_stake = pub_keys
1166 .iter()
1167 .zip(&state_key_pairs)
1168 .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1169 stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1170 state_ver_key: state_key_pair.ver_key(),
1171 connect_info: None,
1172 })
1173 .collect::<Vec<_>>();
1174
1175 let master_map = MasterMap::new();
1176
1177 let builder_port = reserve_tcp_port().unwrap();
1178
1179 let config: HotShotConfig<SeqTypes> = HotShotConfig {
1180 fixed_leader_for_gpuvid: 0,
1181 num_nodes_with_stake: num_nodes.try_into().unwrap(),
1182 known_da_nodes: known_nodes_with_stake.clone(),
1183 da_committees: Default::default(),
1184 known_nodes_with_stake: known_nodes_with_stake.clone(),
1185 next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1186 num_bootstrap: 1usize,
1187 da_staked_committee_size: num_nodes,
1188 view_sync_timeout: Duration::from_secs(1),
1189 data_request_delay: Duration::from_secs(1),
1190 builder_urls: vec1::vec1![
1191 Url::parse(&format!("http://127.0.0.1:{builder_port}")).unwrap()
1192 ],
1193 builder_timeout: Duration::from_secs(1),
1194 start_threshold: (
1195 known_nodes_with_stake.clone().len() as u64,
1196 known_nodes_with_stake.clone().len() as u64,
1197 ),
1198 start_proposing_view: 0,
1199 stop_proposing_view: 0,
1200 start_voting_view: 0,
1201 stop_voting_view: 0,
1202 start_proposing_time: 0,
1203 start_voting_time: 0,
1204 stop_proposing_time: 0,
1205 stop_voting_time: 0,
1206 epoch_height: 30,
1207 epoch_start_block: 1,
1208 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1209 drb_difficulty: 10,
1210 drb_upgrade_difficulty: 20,
1211 };
1212
1213 let anvil = Anvil::new()
1214 .args(["--slots-in-an-epoch", "0", "--balance", "1000000"])
1215 .spawn();
1216
1217 let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1218 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1219
1220 let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1221 let signer = LocalSigner::from(l1_signer_key);
1222
1223 Self {
1224 config,
1225 priv_keys,
1226 state_key_pairs,
1227 master_map,
1228 l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1229 l1_opt: L1ClientOptions {
1230 stake_table_update_interval: Duration::from_secs(5),
1231 l1_events_max_block_range: 1000,
1232 l1_polling_interval: Duration::from_secs(1),
1233 subscription_timeout: Duration::from_secs(5),
1234 ..Default::default()
1235 },
1236 anvil_provider: Some(anvil_provider),
1237 signer,
1238 state_relay_url: None,
1239 builder_port: None,
1240 upgrades: Default::default(),
1241 }
1242 }
1243 }
1244
1245 #[derive(Clone)]
1246 pub struct TestConfig<const NUM_NODES: usize> {
1247 config: HotShotConfig<SeqTypes>,
1248 priv_keys: Vec<BLSPrivKey>,
1249 state_key_pairs: Vec<StateKeyPair>,
1250 master_map: Arc<MasterMap<PubKey>>,
1251 l1_url: Url,
1252 l1_opt: L1ClientOptions,
1253 anvil_provider: Option<AnvilFillProvider>,
1254 signer: LocalSigner<SigningKey>,
1255 state_relay_url: Option<Url>,
1256 builder_port: Option<u16>,
1257 upgrades: BTreeMap<Version, Upgrade>,
1258 }
1259
1260 impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1261 pub fn num_nodes(&self) -> usize {
1262 self.priv_keys.len()
1263 }
1264
1265 pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1266 &self.config
1267 }
1268
1269 pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1270 self.config.builder_urls = builder_urls;
1271 }
1272
1273 pub fn builder_port(&self) -> Option<u16> {
1274 self.builder_port
1275 }
1276
1277 pub fn signer(&self) -> LocalSigner<SigningKey> {
1278 self.signer.clone()
1279 }
1280
1281 pub fn l1_url(&self) -> Url {
1282 self.l1_url.clone()
1283 }
1284
1285 pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1286 self.anvil_provider.as_ref()
1287 }
1288
1289 pub fn get_upgrade_map(&self) -> UpgradeMap {
1290 self.upgrades.clone().into()
1291 }
1292
1293 pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1294 self.upgrades.clone()
1295 }
1296
1297 pub fn staking_priv_keys(&self) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
1298 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1299 }
1300
1301 pub fn validator_providers(
1302 &self,
1303 ) -> Vec<(Address, impl Provider + Clone + use<NUM_NODES>)> {
1304 self.staking_priv_keys()
1305 .into_iter()
1306 .map(|(signer, ..)| {
1307 (
1308 signer.address(),
1309 ProviderBuilder::new()
1310 .wallet(EthereumWallet::from(signer))
1311 .connect_http(self.l1_url.clone()),
1312 )
1313 })
1314 .collect()
1315 }
1316
1317 pub async fn init_nodes(
1318 &self,
1319 upgrade: versions::Upgrade,
1320 ) -> Vec<SequencerContext<network::Memory, NoStorage>> {
1321 join_all((0..self.num_nodes()).map(|i| async move {
1322 self.init_node(
1323 i,
1324 ValidatedState::default(),
1325 no_storage::Options,
1326 Some(NullStateCatchup::default()),
1327 None,
1328 &NoMetrics,
1329 STAKE_TABLE_CAPACITY_FOR_TEST,
1330 NullEventConsumer,
1331 upgrade,
1332 Default::default(),
1333 )
1334 .await
1335 }))
1336 .await
1337 }
1338
1339 pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1340 &self.config.known_nodes_with_stake
1341 }
1342
1343 #[allow(clippy::too_many_arguments)]
1344 pub async fn init_node<P: PersistenceOptions>(
1345 &self,
1346 i: usize,
1347 mut state: ValidatedState,
1348 mut persistence_opt: P,
1349 state_peers: Option<impl StateCatchup + 'static>,
1350 storage: Option<RequestResponseStorage>,
1351 metrics: &dyn Metrics,
1352 stake_table_capacity: usize,
1353 event_consumer: impl EventConsumer + 'static,
1354 upgrade: versions::Upgrade,
1355 upgrades: BTreeMap<Version, Upgrade>,
1356 ) -> SequencerContext<network::Memory, P::Persistence> {
1357 let config = self.config.clone();
1358 let my_peer_config = &config.known_nodes_with_stake[i];
1359 let is_da = config.known_da_nodes.contains(my_peer_config);
1360
1361 let validator_config = ValidatorConfig {
1363 public_key: my_peer_config.stake_table_entry.stake_key,
1364 private_key: self.priv_keys[i].clone(),
1365 stake_value: my_peer_config.stake_table_entry.stake_amount,
1366 state_public_key: self.state_key_pairs[i].ver_key(),
1367 state_private_key: self.state_key_pairs[i].sign_key(),
1368 is_da,
1369 x25519_keypair: None,
1370 p2p_addr: None,
1371 };
1372
1373 let topics = if is_da {
1374 vec![Topic::Global, Topic::Da]
1375 } else {
1376 vec![Topic::Global]
1377 };
1378
1379 let network = Arc::new(MemoryNetwork::new(
1380 &my_peer_config.stake_table_entry.stake_key,
1381 &self.master_map,
1382 &topics,
1383 None,
1384 ));
1385
1386 let builder_account = Self::builder_key().fee_account();
1388 tracing::info!(%builder_account, "prefunding builder account");
1389 state.prefund_account(builder_account, U256::MAX.into());
1390
1391 let persistence = persistence_opt.create().await.unwrap();
1392
1393 let chain_config = state.chain_config.resolve().unwrap_or_default();
1394
1395 let catchup_providers = ParallelStateCatchup::new(&[], Duration::from_secs(5));
1397
1398 if let Some(state_peers) = state_peers {
1400 catchup_providers.add_provider(Arc::new(state_peers));
1401 }
1402
1403 match persistence
1405 .clone()
1406 .into_catchup_provider(BackoffParams::default())
1407 {
1408 Ok(local_catchup) => {
1409 catchup_providers.add_provider(local_catchup);
1410 },
1411 Err(e) => {
1412 tracing::warn!(
1413 "Failed to create local catchup provider: {e:#}. Only using remote \
1414 catchup."
1415 );
1416 },
1417 };
1418
1419 let l1_client = self
1420 .l1_opt
1421 .clone()
1422 .connect(vec![self.l1_url.clone()])
1423 .expect("failed to create L1 client");
1424 l1_client.spawn_tasks().await;
1425
1426 let fetcher = Fetcher::new(
1427 Arc::new(catchup_providers.clone()),
1428 Arc::new(Mutex::new(persistence.clone())),
1429 l1_client.clone(),
1430 chain_config,
1431 );
1432 fetcher.spawn_update_loop().await;
1433
1434 let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1435 let mut membership = EpochCommittees::new_stake(
1436 config.known_nodes_with_stake.clone(),
1437 config.known_da_nodes.clone(),
1438 block_reward,
1439 fetcher,
1440 config.epoch_height,
1441 );
1442 membership.reload_stake(50).await;
1443
1444 let membership = Arc::new(RwLock::new(membership));
1445 let persistence = Arc::new(persistence);
1446
1447 let coordinator = EpochMembershipCoordinator::new(
1448 membership,
1449 config.epoch_height,
1450 &persistence.clone(),
1451 );
1452
1453 let node_state = NodeState::new(
1454 i as u64,
1455 chain_config,
1456 l1_client,
1457 Arc::new(catchup_providers.clone()),
1458 upgrade.base,
1459 coordinator.clone(),
1460 upgrade.base,
1461 )
1462 .with_current_version(upgrade.base)
1463 .with_genesis(state)
1464 .with_epoch_height(config.epoch_height)
1465 .with_upgrades(upgrades)
1466 .with_epoch_start_block(config.epoch_start_block);
1467
1468 tracing::info!(
1469 i,
1470 key = %my_peer_config.stake_table_entry.stake_key,
1471 state_key = %my_peer_config.state_ver_key,
1472 "starting node",
1473 );
1474
1475 SequencerContext::init(
1476 NetworkConfig {
1477 config,
1478 ..Default::default()
1481 },
1482 upgrade,
1483 validator_config,
1484 coordinator,
1485 node_state,
1486 storage,
1487 catchup_providers,
1488 persistence,
1489 network,
1490 self.state_relay_url.clone(),
1491 metrics,
1492 stake_table_capacity,
1493 event_consumer,
1494 Default::default(),
1495 )
1496 .await
1497 .unwrap()
1498 }
1499
1500 pub fn builder_key() -> EthKeyPair {
1501 FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1502 }
1503 }
1504
1505 pub async fn wait_for_decide_on_handle(
1508 events: &mut (impl Stream<Item = Event> + Unpin),
1509 submitted_txn: &Transaction,
1510 ) -> (u64, usize) {
1511 let commitment = submitted_txn.commit();
1512
1513 loop {
1515 let event = events.next().await.unwrap();
1516 tracing::info!("Received event from handle: {event:?}");
1517
1518 if let Decide { leaf_chain, .. } = event.event {
1519 if let Some((height, size)) =
1520 leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1521 if leaf
1522 .block_payload()
1523 .as_ref()?
1524 .transaction_commitments(leaf.block_header().metadata())
1525 .contains(&commitment)
1526 {
1527 let size = leaf.block_payload().unwrap().encode().len();
1528 Some((leaf.block_header().block_number(), size))
1529 } else {
1530 None
1531 }
1532 })
1533 {
1534 tracing::info!(height, "transaction {commitment} sequenced");
1535 return (height, size);
1536 }
1537 } else {
1538 }
1540 }
1541 }
1542
1543 pub async fn wait_for_epochs(
1546 events: &mut (
1547 impl futures::Stream<Item = hotshot_types::event::Event<SeqTypes>> + std::marker::Unpin
1548 ),
1549 epoch_height: u64,
1550 target_epoch: u64,
1551 ) {
1552 tracing::info!(target_epoch, "waiting for epoch");
1553 while let Some(event) = events.next().await {
1554 if let EventType::Decide { leaf_chain, .. } = event.event {
1555 let leaf = leaf_chain[0].leaf.clone();
1556 let epoch = leaf.epoch(epoch_height);
1557 tracing::debug!(
1558 "Node decided at height: {}, epoch: {epoch:?}",
1559 leaf.height(),
1560 );
1561
1562 if epoch > Some(EpochNumber::new(target_epoch)) {
1563 break;
1564 }
1565 }
1566 }
1567 tracing::info!(target_epoch, "epoch started");
1568 }
1569}
1570
1571#[cfg(test)]
1572mod test {
1573 use alloy::node_bindings::Anvil;
1574 use espresso_types::{Header, MOCK_SEQUENCER_VERSIONS, NamespaceId, Payload, Transaction};
1575 use futures::StreamExt;
1576 use hotshot::types::EventType::Decide;
1577 use hotshot_example_types::node_types::TEST_VERSIONS;
1578 use hotshot_types::{
1579 event::LeafInfo,
1580 traits::block_contents::{BlockHeader, BlockPayload},
1581 };
1582 use testing::{TestConfigBuilder, wait_for_decide_on_handle};
1583
1584 use self::testing::run_test_builder;
1585 use super::*;
1586
1587 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1588 async fn test_skeleton_instantiation() {
1589 let anvil = Anvil::new().spawn();
1591 let url = anvil.endpoint_url();
1592 const NUM_NODES: usize = 5;
1593 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1594 .l1_url(url)
1595 .build();
1596
1597 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1598
1599 config.set_builder_urls(vec1::vec1![builder_url]);
1600
1601 let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1602
1603 let handle_0 = &handles[0];
1604
1605 builder_task.start(Box::new(handle_0.event_stream().await));
1607
1608 let mut events = handle_0.event_stream().await;
1609
1610 for handle in handles.iter() {
1611 handle.start_consensus().await;
1612 }
1613
1614 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1616 handles[0]
1617 .submit_transaction(txn.clone())
1618 .await
1619 .expect("Failed to submit transaction");
1620 tracing::info!("Submitted transaction to handle: {txn:?}");
1621
1622 wait_for_decide_on_handle(&mut events, &txn).await;
1623 }
1624
1625 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1626 async fn test_header_invariants() {
1627 let success_height = 30;
1628 let anvil = Anvil::new().spawn();
1630 let url = anvil.endpoint_url();
1631 const NUM_NODES: usize = 5;
1632 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1633 .l1_url(url)
1634 .build();
1635
1636 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1637
1638 config.set_builder_urls(vec1::vec1![builder_url]);
1639 let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1640
1641 let handle_0 = &handles[0];
1642
1643 let mut events = handle_0.event_stream().await;
1644
1645 builder_task.start(Box::new(handle_0.event_stream().await));
1647
1648 for handle in handles.iter() {
1649 handle.start_consensus().await;
1650 }
1651
1652 let mut parent = {
1653 let (genesis_payload, genesis_ns_table) =
1655 Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1656 .await
1657 .unwrap();
1658
1659 let genesis_state = NodeState::mock();
1660 Header::genesis(
1661 &genesis_state,
1662 genesis_payload,
1663 &genesis_ns_table,
1664 TEST_VERSIONS.test.base,
1665 )
1666 };
1667
1668 loop {
1669 let event = events.next().await.unwrap();
1670 tracing::info!("Received event from handle: {event:?}");
1671 let Decide { leaf_chain, .. } = event.event else {
1672 continue;
1673 };
1674 tracing::info!("Got decide {leaf_chain:?}");
1675
1676 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1679 let header = leaf.block_header().clone();
1680 if header.height() == 0 {
1681 parent = header;
1682 continue;
1683 }
1684 assert_eq!(header.height(), parent.height() + 1);
1685 assert!(header.timestamp() >= parent.timestamp());
1686 assert!(header.l1_head() >= parent.l1_head());
1687 assert!(header.l1_finalized() >= parent.l1_finalized());
1688 parent = header;
1689 }
1690
1691 if parent.height() >= success_height {
1692 break;
1693 }
1694 }
1695 }
1696}