espresso_node/
lib.rs

1mod external_event_handler;
2mod message_compat_tests;
3mod proposal_fetcher;
4mod request_response;
5
6pub mod api;
7pub mod catchup;
8pub mod context;
9pub mod genesis;
10pub mod keyset;
11pub mod network;
12pub mod options;
13pub mod persistence;
14pub mod run;
15pub mod state;
16pub mod state_cert;
17pub mod state_signature;
18pub mod util;
19
20use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration};
21
22use alloy::primitives::U256;
23use anyhow::Context;
24use async_lock::{Mutex, RwLock};
25use catchup::{ParallelStateCatchup, StatePeers};
26use context::SequencerContext;
27use derivative::Derivative;
28use espresso_types::{
29    BackoffParams, EpochCommittees, EpochRewardsCalculator, L1ClientOptions, NodeState, PubKey,
30    SeqTypes, ValidatedState,
31    traits::{EventConsumer, MembershipPersistence},
32    v0::traits::SequencerPersistence,
33    v0_3::Fetcher,
34};
35pub use genesis::Genesis;
36use genesis::L1Finalized;
37use hotshot::{
38    traits::implementations::{
39        CdnMetricsValue, CdnTopic, Cliquenet, CombinedNetworks, CompatNetwork, GossipConfig,
40        KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork, RequestResponseConfig,
41        WrappedSignatureKey, derive_libp2p_multiaddr, derive_libp2p_peer_id,
42    },
43    types::SignatureKey,
44};
45use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
46use hotshot_orchestrator::client::{OrchestratorClient, get_complete_config};
47use hotshot_types::{
48    ValidatorConfig,
49    addr::NetAddr,
50    data::ViewNumber,
51    epoch_membership::EpochMembershipCoordinator,
52    light_client::{StateKeyPair, StateSignKey},
53    signature_key::{BLSPrivKey, BLSPubKey},
54    traits::{
55        metrics::{Metrics, NoMetrics},
56        network::ConnectedNetwork,
57        node_implementation::{NodeImplementation, NodeType},
58        storage::Storage,
59    },
60    utils::BuilderCommitment,
61    x25519,
62};
63use libp2p::Multiaddr;
64use moka::future::Cache;
65use network::libp2p::split_off_peer_id;
66use options::Identity;
67pub use options::Options;
68use proposal_fetcher::ProposalFetcherConfig;
69pub use run::main;
70use serde::{Deserialize, Serialize};
71use tokio::select;
72use tracing::info;
73use url::Url;
74use vbs::version::StaticVersion;
75
76use crate::request_response::data_source::Storage as RequestResponseStorage;
77
78pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
79
80/// The Sequencer node is generic over the hotshot CommChannel.
81#[derive(Derivative, Serialize, Deserialize)]
82#[derivative(
83    Copy(bound = ""),
84    Debug(bound = ""),
85    Default(bound = ""),
86    PartialEq(bound = ""),
87    Eq(bound = ""),
88    Hash(bound = "")
89)]
90pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
91
92// Using derivative to derive Clone triggers the clippy lint
93// https://rust-lang.github.io/rust-clippy/master/index.html#/incorrect_clone_impl_on_copy_type
94impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
95    fn clone(&self) -> Self {
96        *self
97    }
98}
99
100pub type SequencerApiVersion = StaticVersion<0, 1>;
101
102impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
103    for Node<N, P>
104{
105    type Network = N;
106    type Storage = Arc<P>;
107}
108
109#[derive(Clone, Debug)]
110pub struct NetworkParams {
111    /// The address where a CDN marshal is located
112    pub cdn_endpoint: String,
113    pub orchestrator_url: Url,
114    pub state_relay_server_url: Url,
115
116    /// The URLs of the builders to use for submitting transactions
117    pub builder_urls: Vec<Url>,
118
119    pub private_staking_key: BLSPrivKey,
120    pub private_state_key: StateSignKey,
121    pub state_peers: Vec<Url>,
122    pub config_peers: Option<Vec<Url>>,
123    pub catchup_backoff: BackoffParams,
124    /// Base timeout for catchup requests to peers.
125    pub catchup_base_timeout: Duration,
126    /// Timeout for local catchup provider requests.
127    pub local_catchup_timeout: Duration,
128    /// The address to advertise as our public API's URL
129    pub public_api_url: Option<Url>,
130    /// Cliquenet network address.
131    pub cliquenet_bind_addr: NetAddr,
132    /// X25519 secret key.
133    pub x25519_secret_key: x25519::SecretKey,
134    /// The address to send to other Libp2p nodes to contact us
135    pub libp2p_advertise_address: String,
136    /// The address to bind to for Libp2p
137    pub libp2p_bind_address: String,
138    /// The (optional) bootstrap node addresses for Libp2p. If supplied, these will
139    /// override the bootstrap nodes specified in the config file.
140    pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
141
142    /// The heartbeat interval
143    pub libp2p_heartbeat_interval: Duration,
144
145    /// The number of past heartbeats to gossip about
146    pub libp2p_history_gossip: usize,
147    /// The number of past heartbeats to remember the full messages for
148    pub libp2p_history_length: usize,
149
150    /// The target number of peers in the mesh
151    pub libp2p_mesh_n: usize,
152    /// The maximum number of peers in the mesh
153    pub libp2p_mesh_n_high: usize,
154    /// The minimum number of peers in the mesh
155    pub libp2p_mesh_n_low: usize,
156    /// The minimum number of mesh peers that must be outbound
157    pub libp2p_mesh_outbound_min: usize,
158
159    /// The maximum gossip message size
160    pub libp2p_max_gossip_transmit_size: usize,
161
162    /// The maximum direct message size
163    pub libp2p_max_direct_transmit_size: u64,
164
165    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
166    pub libp2p_max_ihave_length: usize,
167
168    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
169    pub libp2p_max_ihave_messages: usize,
170
171    /// The time period that message hashes are stored in the cache
172    pub libp2p_published_message_ids_cache_time: Duration,
173
174    /// The time to wait for a Libp2p message requested through IWANT following an IHAVE advertisement
175    pub libp2p_iwant_followup_time: Duration,
176
177    /// The maximum number of Libp2p messages we will process in a given RPC
178    pub libp2p_max_messages_per_rpc: Option<usize>,
179
180    /// How many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them
181    pub libp2p_gossip_retransmission: u32,
182
183    /// If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score
184    pub libp2p_flood_publish: bool,
185
186    /// The time period that Libp2p message hashes are stored in the cache
187    pub libp2p_duplicate_cache_time: Duration,
188
189    /// Time to live for Libp2p fanout peers
190    pub libp2p_fanout_ttl: Duration,
191
192    /// Initial delay in each Libp2p heartbeat
193    pub libp2p_heartbeat_initial_delay: Duration,
194
195    /// How many Libp2p peers we will emit gossip to at each heartbeat
196    pub libp2p_gossip_factor: f64,
197
198    /// Minimum number of Libp2p peers to emit gossip to during a heartbeat
199    pub libp2p_gossip_lazy: usize,
200}
201
202pub struct L1Params {
203    pub urls: Vec<Url>,
204    pub options: L1ClientOptions,
205}
206
207#[allow(clippy::too_many_arguments)]
208pub async fn init_node<P>(
209    genesis: Genesis,
210    network_params: NetworkParams,
211    metrics: Box<dyn Metrics>,
212    mut persistence: P,
213    l1_params: L1Params,
214    storage: Option<RequestResponseStorage>,
215    event_consumer: impl EventConsumer + 'static,
216    is_da: bool,
217    identity: Identity,
218    proposal_fetcher_config: ProposalFetcherConfig,
219) -> anyhow::Result<SequencerContext<network::Production, P>>
220where
221    P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
222    Arc<P>: Storage<SeqTypes>,
223{
224    // Expose git information via status API.
225    let info = espresso_utils::build_info!();
226    metrics
227        .text_family(
228            "version".into(),
229            vec!["rev".into(), "desc".into(), "timestamp".into()],
230        )
231        .create(vec![
232            info.git_sha.into(),
233            info.git_describe.into(),
234            info.git_commit_timestamp.into(),
235        ]);
236
237    metrics
238        .text_family(
239            "build_info".into(),
240            vec![
241                "modified".into(),
242                "branch".into(),
243                "debug".into(),
244                "features".into(),
245            ],
246        )
247        .create(vec![
248            info.git_dirty.into(),
249            info.git_branch.into(),
250            info.is_debug.to_string(),
251            env!("VERGEN_CARGO_FEATURES").into(),
252        ]);
253
254    // Expose Node Entity Information via the status/metrics API
255    metrics
256        .text_family(
257            "node_identity_general".into(),
258            vec![
259                "name".into(),
260                "description".into(),
261                "company_name".into(),
262                "company_website".into(),
263                "operating_system".into(),
264                "node_type".into(),
265                "network_type".into(),
266            ],
267        )
268        .create(vec![
269            identity.node_name.unwrap_or_default(),
270            identity.node_description.unwrap_or_default(),
271            identity.company_name.unwrap_or_default(),
272            identity
273                .company_website
274                .map(|u| u.into())
275                .unwrap_or_default(),
276            identity.operating_system.unwrap_or_default(),
277            identity.node_type.unwrap_or_default(),
278            identity.network_type.unwrap_or_default(),
279        ]);
280
281    // Expose Node Identity Location via the status/metrics API
282    metrics
283        .text_family(
284            "node_identity_location".into(),
285            vec!["country".into(), "latitude".into(), "longitude".into()],
286        )
287        .create(vec![
288            identity.country_code.unwrap_or_default(),
289            identity.latitude.map(|l| l.to_string()).unwrap_or_default(),
290            identity
291                .longitude
292                .map(|l| l.to_string())
293                .unwrap_or_default(),
294        ]);
295
296    // Expose icons for node dashboard via the status/metrics API
297    metrics
298        .text_family(
299            "node_identity_icon".into(),
300            vec![
301                "small_1x".into(),
302                "small_2x".into(),
303                "small_3x".into(),
304                "large_1x".into(),
305                "large_2x".into(),
306                "large_3x".into(),
307            ],
308        )
309        .create(vec![
310            identity
311                .icon_14x14_1x
312                .map(|u| u.to_string())
313                .unwrap_or_default(),
314            identity
315                .icon_14x14_2x
316                .map(|u| u.to_string())
317                .unwrap_or_default(),
318            identity
319                .icon_14x14_3x
320                .map(|u| u.to_string())
321                .unwrap_or_default(),
322            identity
323                .icon_24x24_1x
324                .map(|u| u.to_string())
325                .unwrap_or_default(),
326            identity
327                .icon_24x24_2x
328                .map(|u| u.to_string())
329                .unwrap_or_default(),
330            identity
331                .icon_24x24_3x
332                .map(|u| u.to_string())
333                .unwrap_or_default(),
334        ]);
335
336    // Stick our public key in `metrics` so it is easily accessible via the status API.
337    let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
338    metrics
339        .text_family("node".into(), vec!["key".into()])
340        .create(vec![pub_key.to_string()]);
341
342    // Parse the Libp2p bind and advertise addresses to multiaddresses
343    let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
344        .with_context(|| {
345            format!(
346                "Failed to derive Libp2p bind address of {}",
347                &network_params.libp2p_bind_address
348            )
349        })?;
350    let libp2p_advertise_address =
351        derive_libp2p_multiaddr(&network_params.libp2p_advertise_address).with_context(|| {
352            format!(
353                "Failed to derive Libp2p advertise address of {}",
354                &network_params.libp2p_advertise_address
355            )
356        })?;
357
358    info!("Libp2p bind address: {}", libp2p_bind_address);
359    info!("Libp2p advertise address: {}", libp2p_advertise_address);
360
361    // Orchestrator client
362    let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
363    let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
364    let validator_config = ValidatorConfig {
365        public_key: pub_key,
366        private_key: network_params.private_staking_key,
367        stake_value: U256::ONE,
368        state_public_key: state_key_pair.ver_key(),
369        state_private_key: state_key_pair.sign_key(),
370        is_da,
371        x25519_keypair: Some(x25519::Keypair::from(&network_params.x25519_secret_key)),
372        p2p_addr: Some(network_params.cliquenet_bind_addr.clone()),
373    };
374
375    // Derive our Libp2p public key from our private key
376    let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
377        &validator_config.private_key,
378    )
379    .with_context(|| "Failed to derive Libp2p peer ID")?;
380
381    // Print the libp2p public key
382    info!("Starting Libp2p with PeerID: {libp2p_public_key}");
383
384    let loaded_network_config_from_persistence = persistence.load_config().await?;
385    let (mut network_config, wait_for_orchestrator) = match (
386        loaded_network_config_from_persistence,
387        network_params.config_peers,
388    ) {
389        (Some(config), _) => {
390            tracing::warn!("loaded network config from storage, rejoining existing network");
391            (config, false)
392        },
393        // If we were told to fetch the config from an already-started peer, do so.
394        (None, Some(peers)) => {
395            tracing::warn!(?peers, "loading network config from peers");
396            let peers = StatePeers::<SequencerApiVersion>::from_urls(
397                peers,
398                network_params.catchup_backoff,
399                network_params.catchup_base_timeout,
400                &NoMetrics,
401            );
402            let config = peers.fetch_config(validator_config.clone()).await?;
403
404            tracing::warn!(
405                node_id = config.node_index,
406                stake_table = ?config.config.known_nodes_with_stake,
407                "loaded config",
408            );
409            persistence.save_config(&config).await?;
410            (config, false)
411        },
412        // Otherwise, this is a fresh network; load from the orchestrator.
413        (None, None) => {
414            tracing::warn!("loading network config from orchestrator");
415            tracing::warn!(
416                "waiting for other nodes to connect, DO NOT RESTART until fully connected"
417            );
418            let config = get_complete_config(
419                &orchestrator_client,
420                validator_config.clone(),
421                // Register in our Libp2p advertise address and public key so other nodes
422                // can contact us on startup
423                Some(libp2p_advertise_address),
424                Some(libp2p_public_key),
425            )
426            .await?
427            .0;
428
429            tracing::warn!(
430                node_id = config.node_index,
431                stake_table = ?config.config.known_nodes_with_stake,
432                "loaded config",
433            );
434            persistence.save_config(&config).await?;
435            tracing::warn!("all nodes connected");
436            (config, true)
437        },
438    };
439
440    if let Some(upgrade) = genesis.upgrades.get(&genesis.upgrade_version) {
441        upgrade.set_hotshot_config_parameters(&mut network_config.config);
442    }
443
444    // Override the builder URLs in the network config with the ones from the command line
445    // if any were provided
446    if !network_params.builder_urls.is_empty() {
447        network_config.config.builder_urls = network_params.builder_urls.try_into().unwrap();
448    }
449
450    let epoch_height = genesis.epoch_height.unwrap_or_default();
451    let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
452    let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
453    let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
454    let stake_table_capacity = genesis
455        .stake_table_capacity
456        .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
457
458    let version_upgrade = versions::Upgrade::new(genesis.base_version, genesis.upgrade_version);
459
460    tracing::warn!("setting epoch_height={epoch_height:?}");
461    tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
462    tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
463    tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
464    tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
465    tracing::warn!("setting version_upgrade={version_upgrade}");
466    network_config.config.epoch_height = epoch_height;
467    network_config.config.drb_difficulty = drb_difficulty;
468    network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
469    network_config.config.epoch_start_block = epoch_start_block;
470    network_config.config.stake_table_capacity = stake_table_capacity;
471
472    if let Some(da_committees) = &genesis.da_committees {
473        tracing::warn!("setting da_committees from genesis: {da_committees:?}");
474        network_config.config.da_committees = da_committees.clone();
475    }
476
477    // If the `Libp2p` bootstrap nodes were supplied via the command line, override those
478    // present in the config file.
479    if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
480        if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
481            // If the libp2p configuration is present, we can override the bootstrap nodes.
482
483            // Split off the peer ID from the addresses
484            libp2p_config.bootstrap_nodes = bootstrap_nodes
485                .into_iter()
486                .map(split_off_peer_id)
487                .collect::<Result<Vec<_>, _>>()
488                .with_context(|| "Failed to parse peer ID from bootstrap node")?;
489        } else {
490            // If not, don't try launching with them. Eventually we may want to
491            // provide a default configuration here instead.
492            tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
493        }
494    }
495
496    let node_index = network_config.node_index;
497
498    // If we are a DA node, we need to subscribe to the DA topic
499    let topics = {
500        let mut topics = vec![CdnTopic::Global];
501        if is_da {
502            topics.push(CdnTopic::Da);
503        }
504        topics
505    };
506
507    // Initialize the push CDN network (and perform the initial connection)
508    let cdn_network = PushCdnNetwork::new(
509        network_params.cdn_endpoint,
510        topics,
511        KeyPair {
512            public_key: WrappedSignatureKey(validator_config.public_key),
513            private_key: validator_config.private_key.clone(),
514        },
515        CdnMetricsValue::new(&*metrics),
516    )
517    .with_context(|| format!("Failed to create CDN network {node_index}"))?;
518
519    // Configure gossipsub based on the command line options
520    let gossip_config = GossipConfig {
521        heartbeat_interval: network_params.libp2p_heartbeat_interval,
522        history_gossip: network_params.libp2p_history_gossip,
523        history_length: network_params.libp2p_history_length,
524        mesh_n: network_params.libp2p_mesh_n,
525        mesh_n_high: network_params.libp2p_mesh_n_high,
526        mesh_n_low: network_params.libp2p_mesh_n_low,
527        mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
528        max_ihave_messages: network_params.libp2p_max_ihave_messages,
529        max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
530        max_ihave_length: network_params.libp2p_max_ihave_length,
531        published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
532        iwant_followup_time: network_params.libp2p_iwant_followup_time,
533        max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
534        gossip_retransmission: network_params.libp2p_gossip_retransmission,
535        flood_publish: network_params.libp2p_flood_publish,
536        duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
537        fanout_ttl: network_params.libp2p_fanout_ttl,
538        heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
539        gossip_factor: network_params.libp2p_gossip_factor,
540        gossip_lazy: network_params.libp2p_gossip_lazy,
541    };
542
543    // Configure request/response based on the command line options
544    let request_response_config = RequestResponseConfig {
545        request_size_maximum: network_params.libp2p_max_direct_transmit_size,
546        response_size_maximum: network_params.libp2p_max_direct_transmit_size,
547    };
548
549    let l1_client = l1_params
550        .options
551        .with_metrics(&*metrics)
552        .connect(l1_params.urls)
553        .with_context(|| "failed to create L1 client")?;
554
555    info!("Validating fee contract");
556
557    genesis.validate_fee_contract(&l1_client).await?;
558
559    info!("Fee contract validated. Spawning L1 tasks");
560
561    l1_client.spawn_tasks().await;
562
563    info!(
564        "L1 tasks spawned. Waiting for L1 genesis: {:?}",
565        genesis.l1_finalized
566    );
567
568    let l1_genesis = match genesis.l1_finalized {
569        L1Finalized::Block(b) => b,
570        L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
571        L1Finalized::Timestamp { timestamp } => {
572            l1_client
573                .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
574                .await
575        },
576    };
577
578    info!("L1 genesis found: {:?}", l1_genesis);
579
580    let genesis_chain_config = genesis.header.chain_config;
581    let mut genesis_state = ValidatedState {
582        chain_config: genesis_chain_config.into(),
583        ..Default::default()
584    };
585    for (address, amount) in genesis.accounts {
586        tracing::warn!(%address, %amount, "Prefunding account for demo");
587        genesis_state.prefund_account(address, amount);
588    }
589
590    // Create the list of parallel catchup providers
591    let state_catchup_providers =
592        ParallelStateCatchup::new(&[], network_params.local_catchup_timeout);
593
594    // Add the state peers to the list
595    let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
596        network_params.state_peers,
597        network_params.catchup_backoff,
598        network_params.catchup_base_timeout,
599        &*metrics,
600    );
601    state_catchup_providers.add_provider(Arc::new(state_peers));
602
603    // Add the local (persistence) catchup provider to the list (if we can)
604    match persistence
605        .clone()
606        .into_catchup_provider(network_params.catchup_backoff)
607    {
608        Ok(catchup) => {
609            state_catchup_providers.add_provider(Arc::new(catchup));
610        },
611        Err(e) => {
612            tracing::warn!(
613                "Failed to create local catchup provider: {e:#}. Only using remote catchup."
614            );
615        },
616    };
617
618    persistence.enable_metrics(&*metrics);
619
620    let fetcher = Fetcher::new(
621        Arc::new(state_catchup_providers.clone()),
622        Arc::new(Mutex::new(persistence.clone())),
623        l1_client.clone(),
624        genesis.chain_config,
625    );
626
627    info!("Spawning update loop");
628
629    fetcher.spawn_update_loop().await;
630    info!("Update loop spawned. Fetching block reward");
631
632    let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
633    info!("Block reward fetched: {:?}", block_reward);
634    // Create the HotShot membership
635    let mut membership = EpochCommittees::new_stake(
636        network_config.config.known_nodes_with_stake.clone(),
637        network_config.config.known_da_nodes.clone(),
638        block_reward,
639        fetcher,
640        epoch_height,
641    );
642    info!("Membership created. Reloading stake");
643    membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
644    info!("Stake reloaded");
645
646    let membership: Arc<RwLock<EpochCommittees>> = Arc::new(RwLock::new(membership));
647    let persistence = Arc::new(persistence);
648    let coordinator = EpochMembershipCoordinator::new(
649        membership,
650        network_config.config.epoch_height,
651        &persistence.clone(),
652    );
653
654    let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
655
656    let instance_state = NodeState {
657        chain_config: genesis.chain_config,
658        genesis_chain_config,
659        l1_client,
660        genesis_header: genesis.header,
661        genesis_state,
662        l1_genesis: Some(l1_genesis),
663        node_id: node_index,
664        upgrades: genesis.upgrades,
665        current_version: genesis.base_version,
666        epoch_height: Some(epoch_height),
667        state_catchup: Arc::new(state_catchup_providers.clone()),
668        coordinator: coordinator.clone(),
669        genesis_version: genesis.genesis_version,
670        epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
671        epoch_rewards_calculator,
672        light_client_contract_address: Cache::builder().max_capacity(1).build(),
673        token_contract_address: Cache::builder().max_capacity(1).build(),
674        finalized_hotshot_height: Cache::builder()
675            .max_capacity(1)
676            .time_to_live(Duration::from_secs(30))
677            .build(),
678    };
679
680    let combined_network = {
681        info!("Initializing Libp2p network");
682        let p2p_network = Libp2pNetwork::from_config(
683            network_config.clone(),
684            persistence.clone(),
685            gossip_config,
686            request_response_config,
687            libp2p_bind_address,
688            &validator_config.public_key,
689            // We need the private key so we can derive our Libp2p keypair
690            // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html)
691            &validator_config.private_key,
692            hotshot::traits::implementations::Libp2pMetricsValue::new(&*metrics),
693        )
694        .await
695        .with_context(|| {
696            format!(
697                "Failed to create libp2p network on node {node_index}; binding to {:?}",
698                network_params.libp2p_bind_address
699            )
700        })?;
701
702        info!("Libp2p network initialized");
703
704        tracing::warn!("Waiting for at least one connection to be initialized");
705        select! {
706            _ = cdn_network.wait_for_ready() => {
707                tracing::warn!("CDN connection initialized");
708            },
709            _ = p2p_network.wait_for_ready() => {
710                tracing::warn!("P2P connection initialized");
711            },
712        };
713
714        // Combine the CDN and P2P networks
715        CombinedNetworks::new(cdn_network, p2p_network, Some(Duration::from_secs(1)))
716    };
717
718    let network = {
719        let peers = coordinator
720            .stake_table_for_epoch(None)
721            .await?
722            .stake_table()
723            .await
724            .0
725            .into_iter()
726            .filter_map(|cfg| Some((cfg.stake_table_entry.stake_key, cfg.connect_info?)));
727
728        let c = Cliquenet::<PubKey>::create(
729            "sequencer",
730            pub_key,
731            network_params.x25519_secret_key.into(),
732            network_params.cliquenet_bind_addr.clone(),
733            peers,
734            metrics.clone(),
735        )
736        .await?;
737
738        Arc::new(CompatNetwork::new(c, combined_network).await)
739    };
740
741    let mut ctx = SequencerContext::init(
742        network_config,
743        version_upgrade,
744        validator_config,
745        coordinator,
746        instance_state,
747        storage,
748        state_catchup_providers,
749        persistence,
750        network.clone(),
751        Some(network_params.state_relay_server_url),
752        &*metrics,
753        genesis.stake_table.capacity,
754        event_consumer,
755        proposal_fetcher_config,
756    )
757    .await?;
758
759    network.set_upgrade_lock(ctx.upgrade_lock().await);
760
761    if wait_for_orchestrator {
762        ctx = ctx.wait_for_orchestrator(orchestrator_client);
763    }
764
765    Ok(ctx)
766}
767
768pub fn empty_builder_commitment() -> BuilderCommitment {
769    BuilderCommitment::from_bytes([])
770}
771
772#[cfg(any(test, feature = "testing"))]
773pub mod testing {
774    use std::{
775        cmp::max,
776        collections::{BTreeMap, HashMap},
777        time::Duration,
778    };
779
780    use alloy::{
781        network::EthereumWallet,
782        node_bindings::{Anvil, AnvilInstance},
783        primitives::{Address, U256},
784        providers::{
785            Provider, ProviderBuilder, RootProvider,
786            fillers::{
787                BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
788            },
789            layers::AnvilProvider,
790        },
791        signers::{
792            k256::ecdsa::SigningKey,
793            local::{LocalSigner, PrivateKeySigner},
794        },
795    };
796    use async_lock::RwLock;
797    use catchup::NullStateCatchup;
798    use committable::Committable;
799    use espresso_contract_deployer::{
800        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
801        network_config::light_client_genesis_from_stake_table,
802    };
803    use espresso_types::{
804        EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
805        Upgrade, UpgradeMap,
806        eth_signature_key::EthKeyPair,
807        v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
808    };
809    use futures::{
810        future::join_all,
811        stream::{Stream, StreamExt},
812    };
813    use hotshot::{
814        traits::{
815            BlockPayload,
816            implementations::{MasterMap, MemoryNetwork},
817        },
818        types::EventType::{self, Decide},
819    };
820    use hotshot_builder_refactored::service::{
821        BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
822    };
823    use hotshot_testing::block_builder::{
824        BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
825    };
826    use hotshot_types::{
827        HotShotConfig, PeerConfig,
828        data::EpochNumber,
829        event::LeafInfo,
830        light_client::StateKeyPair,
831        signature_key::BLSKeyPair,
832        traits::{
833            EncodeBytes, block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
834            signature_key::BuilderSignatureKey,
835        },
836    };
837    use rand::SeedableRng as _;
838    use rand_chacha::ChaCha20Rng;
839    use staking_cli::demo::{DelegationConfig, StakingTransactions};
840    use test_utils::reserve_tcp_port;
841    use tokio::spawn;
842    use vbs::version::{StaticVersionType, Version};
843    use versions::EPOCH_VERSION;
844
845    use super::*;
846    use crate::{
847        catchup::ParallelStateCatchup,
848        persistence::no_storage::{self, NoStorage},
849    };
850
851    const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
852    const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
853    type AnvilFillProvider = AnvilProvider<
854        FillProvider<
855            JoinFill<
856                alloy::providers::Identity,
857                JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
858            >,
859            RootProvider,
860        >,
861    >;
862    struct LegacyBuilderImplementation {
863        global_state: Arc<LegacyGlobalState<SeqTypes>>,
864    }
865
866    impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
867        fn start(
868            self: Box<Self>,
869            stream: Box<
870                dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
871                    + std::marker::Unpin
872                    + Send
873                    + 'static,
874            >,
875        ) {
876            spawn(async move {
877                let res = self.global_state.start_event_loop(stream).await;
878                tracing::error!(?res, "testing legacy builder service exited");
879            });
880        }
881    }
882
883    pub async fn run_legacy_builder<const NUM_NODES: usize>(
884        port: Option<u16>,
885        max_block_size: Option<u64>,
886    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
887        let builder_key_pair = TestConfig::<0>::builder_key();
888        let port = match port {
889            Some(p) => p,
890            None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
891        };
892
893        // This should never fail.
894        let url: Url = format!("http://localhost:{port}")
895            .parse()
896            .expect("Failed to parse builder URL");
897
898        // create the global state
899        let global_state = LegacyGlobalState::new(
900            LegacyBuilderConfig {
901                builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
902                max_api_waiting_time: Duration::from_secs(1),
903                max_block_size_increment_period: Duration::from_secs(60),
904                maximize_txn_capture_timeout: Duration::from_millis(100),
905                txn_garbage_collect_duration: Duration::from_secs(60),
906                txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
907                tx_status_cache_capacity: 81920,
908                base_fee: 10,
909            },
910            NodeState::default(),
911            max_block_size.unwrap_or(300),
912            NUM_NODES,
913        );
914
915        // Create and spawn the tide-disco app to serve the builder APIs
916        let app = Arc::clone(&global_state)
917            .into_app()
918            .expect("Failed to create builder tide-disco app");
919
920        spawn(async move {
921            app.serve(
922                format!("http://0.0.0.0:{port}")
923                    .parse::<Url>()
924                    .expect("Failed to parse builder listener"),
925                EpochVersion::instance(),
926            )
927            .await
928        });
929
930        // Pass on the builder task to be injected in the testing harness
931        (Box::new(LegacyBuilderImplementation { global_state }), url)
932    }
933
934    pub async fn run_test_builder<const NUM_NODES: usize>(
935        port: Option<u16>,
936    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
937        let port = match port {
938            Some(p) => p,
939            None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
940        };
941
942        // This should never fail.
943        let url: Url = format!("http://localhost:{port}")
944            .parse()
945            .expect("Failed to parse builder URL");
946        tracing::info!("Starting test builder on {url}");
947
948        let task = <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
949            NUM_NODES,
950            format!("http://0.0.0.0:{port}")
951                .parse()
952                .expect("Failed to parse builder listener"),
953            (),
954            HashMap::new(),
955        )
956        .await;
957
958        (task, url)
959    }
960
961    pub struct TestConfigBuilder<const NUM_NODES: usize> {
962        config: HotShotConfig<SeqTypes>,
963        priv_keys: Vec<BLSPrivKey>,
964        state_key_pairs: Vec<StateKeyPair>,
965        master_map: Arc<MasterMap<PubKey>>,
966        l1_url: Url,
967        l1_opt: L1ClientOptions,
968        anvil_provider: Option<AnvilFillProvider>,
969        signer: LocalSigner<SigningKey>,
970        state_relay_url: Option<Url>,
971        builder_port: Option<u16>,
972        upgrades: BTreeMap<Version, Upgrade>,
973    }
974
975    pub fn staking_priv_keys(
976        priv_keys: &[BLSPrivKey],
977        state_key_pairs: &[StateKeyPair],
978        num_nodes: usize,
979    ) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
980        let seed = [42u8; 32];
981        let mut rng = ChaCha20Rng::from_seed(seed); // Create a deterministic RNG
982        let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
983        eth_key_pairs
984            .zip(priv_keys.iter())
985            .zip(state_key_pairs.iter())
986            .map(|((eth, bls), state)| (eth, bls.clone().into(), state.clone()))
987            .collect()
988    }
989
990    impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
991        pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
992            self.builder_port = builder_port;
993            self
994        }
995
996        pub fn state_relay_url(mut self, url: Url) -> Self {
997            self.state_relay_url = Some(url);
998            self
999        }
1000
1001        /// Sets the Anvil provider, constructed using the Anvil instance.
1002        /// Also sets the L1 URL based on the Anvil endpoint.
1003        /// The `AnvilProvider` can be used to configure the Anvil, for example,
1004        /// by enabling interval mining after the test network is initialized.
1005        pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
1006            self.l1_url = anvil.endpoint().parse().unwrap();
1007            let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
1008            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1009            self.anvil_provider = Some(anvil_provider);
1010            self
1011        }
1012
1013        /// Sets a custom L1 URL, overriding any previously set Anvil instance URL.
1014        /// This removes the anvil provider, as well as it is no longer needed
1015        pub fn l1_url(mut self, l1_url: Url) -> Self {
1016            self.anvil_provider = None;
1017            self.l1_url = l1_url;
1018            self
1019        }
1020
1021        pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
1022            self.l1_opt = opt;
1023            self
1024        }
1025
1026        pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
1027            self.signer = signer;
1028            self
1029        }
1030
1031        pub fn upgrades(mut self, v: Version, upgrades: BTreeMap<Version, Upgrade>) -> Self {
1032            let upgrade = upgrades.get(&v).unwrap();
1033            upgrade.set_hotshot_config_parameters(&mut self.config);
1034            self.upgrades = upgrades;
1035            self
1036        }
1037
1038        /// Version specific upgrade setup. Extend to future upgrades
1039        /// by adding a branch to the `match` statement.
1040        pub async fn set_upgrades(mut self, version: Version) -> Self {
1041            let upgrade = match version {
1042                version if version >= EPOCH_VERSION => {
1043                    tracing::debug!(?version, "upgrade version");
1044                    let blocks_per_epoch = self.config.epoch_height;
1045                    let epoch_start_block = self.config.epoch_start_block;
1046
1047                    let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1048                        &self.config.hotshot_stake_table(),
1049                        STAKE_TABLE_CAPACITY_FOR_TEST,
1050                    )
1051                    .unwrap();
1052
1053                    let validators =
1054                        staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
1055
1056                    let deployer = ProviderBuilder::new()
1057                        .wallet(EthereumWallet::from(self.signer.clone()))
1058                        .connect_http(self.l1_url.clone());
1059
1060                    let mut contracts = Contracts::new();
1061                    let args = DeployerArgsBuilder::default()
1062                        .deployer(deployer.clone())
1063                        .rpc_url(self.l1_url.clone())
1064                        .mock_light_client(true)
1065                        .genesis_lc_state(genesis_state)
1066                        .genesis_st_state(genesis_stake)
1067                        .blocks_per_epoch(blocks_per_epoch)
1068                        .epoch_start_block(epoch_start_block)
1069                        .exit_escrow_period(U256::from(max(
1070                            blocks_per_epoch * 15 + 100,
1071                            DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1072                        )))
1073                        .multisig_pauser(self.signer.address())
1074                        .token_name("Espresso".to_string())
1075                        .token_symbol("ESP".to_string())
1076                        .initial_token_supply(U256::from(3590000000u64))
1077                        .ops_timelock_delay(U256::from(0))
1078                        .ops_timelock_admin(self.signer.address())
1079                        .ops_timelock_proposers(vec![self.signer.address()])
1080                        .ops_timelock_executors(vec![self.signer.address()])
1081                        .safe_exit_timelock_delay(U256::from(10))
1082                        .safe_exit_timelock_admin(self.signer.address())
1083                        .safe_exit_timelock_proposers(vec![self.signer.address()])
1084                        .safe_exit_timelock_executors(vec![self.signer.address()])
1085                        .build()
1086                        .unwrap();
1087                    args.deploy_all(&mut contracts)
1088                        .await
1089                        .expect("failed to deploy all contracts");
1090
1091                    let st_addr = contracts
1092                        .address(Contract::StakeTableProxy)
1093                        .expect("StakeTableProxy address not found");
1094                    StakingTransactions::create(
1095                        self.l1_url.clone(),
1096                        &deployer,
1097                        st_addr,
1098                        validators,
1099                        None,
1100                        DelegationConfig::default(),
1101                    )
1102                    .await
1103                    .expect("stake table setup failed")
1104                    .apply_all()
1105                    .await
1106                    .expect("send all txns failed");
1107
1108                    Upgrade::pos_view_based(st_addr)
1109                },
1110                _ => panic!("Upgrade not configured for version {version:?}"),
1111            };
1112
1113            let mut upgrades = std::collections::BTreeMap::new();
1114            upgrade.set_hotshot_config_parameters(&mut self.config);
1115            upgrades.insert(version, upgrade);
1116
1117            self.upgrades = upgrades;
1118            self
1119        }
1120
1121        pub fn epoch_height(mut self, epoch_height: u64) -> Self {
1122            self.config.epoch_height = epoch_height;
1123            self
1124        }
1125
1126        pub fn epoch_start_block(mut self, start_block: u64) -> Self {
1127            self.config.epoch_start_block = start_block;
1128            self
1129        }
1130
1131        pub fn build(self) -> TestConfig<NUM_NODES> {
1132            TestConfig {
1133                config: self.config,
1134                priv_keys: self.priv_keys,
1135                state_key_pairs: self.state_key_pairs,
1136                master_map: self.master_map,
1137                l1_url: self.l1_url,
1138                l1_opt: self.l1_opt,
1139                signer: self.signer,
1140                state_relay_url: self.state_relay_url,
1141                builder_port: self.builder_port,
1142                upgrades: self.upgrades,
1143                anvil_provider: self.anvil_provider,
1144            }
1145        }
1146
1147        pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
1148            self.config.stake_table_capacity = stake_table_capacity;
1149            self
1150        }
1151    }
1152
1153    impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
1154        fn default() -> Self {
1155            let num_nodes = NUM_NODES;
1156
1157            // Generate keys for the nodes.
1158            let seed = [0; 32];
1159            let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1160                .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1161                .unzip();
1162            let state_key_pairs = (0..num_nodes)
1163                .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1164                .collect::<Vec<_>>();
1165            let known_nodes_with_stake = pub_keys
1166                .iter()
1167                .zip(&state_key_pairs)
1168                .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1169                    stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1170                    state_ver_key: state_key_pair.ver_key(),
1171                    connect_info: None,
1172                })
1173                .collect::<Vec<_>>();
1174
1175            let master_map = MasterMap::new();
1176
1177            let builder_port = reserve_tcp_port().unwrap();
1178
1179            let config: HotShotConfig<SeqTypes> = HotShotConfig {
1180                fixed_leader_for_gpuvid: 0,
1181                num_nodes_with_stake: num_nodes.try_into().unwrap(),
1182                known_da_nodes: known_nodes_with_stake.clone(),
1183                da_committees: Default::default(),
1184                known_nodes_with_stake: known_nodes_with_stake.clone(),
1185                next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1186                num_bootstrap: 1usize,
1187                da_staked_committee_size: num_nodes,
1188                view_sync_timeout: Duration::from_secs(1),
1189                data_request_delay: Duration::from_secs(1),
1190                builder_urls: vec1::vec1![
1191                    Url::parse(&format!("http://127.0.0.1:{builder_port}")).unwrap()
1192                ],
1193                builder_timeout: Duration::from_secs(1),
1194                start_threshold: (
1195                    known_nodes_with_stake.clone().len() as u64,
1196                    known_nodes_with_stake.clone().len() as u64,
1197                ),
1198                start_proposing_view: 0,
1199                stop_proposing_view: 0,
1200                start_voting_view: 0,
1201                stop_voting_view: 0,
1202                start_proposing_time: 0,
1203                start_voting_time: 0,
1204                stop_proposing_time: 0,
1205                stop_voting_time: 0,
1206                epoch_height: 30,
1207                epoch_start_block: 1,
1208                stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1209                drb_difficulty: 10,
1210                drb_upgrade_difficulty: 20,
1211            };
1212
1213            let anvil = Anvil::new()
1214                .args(["--slots-in-an-epoch", "0", "--balance", "1000000"])
1215                .spawn();
1216
1217            let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1218            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1219
1220            let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1221            let signer = LocalSigner::from(l1_signer_key);
1222
1223            Self {
1224                config,
1225                priv_keys,
1226                state_key_pairs,
1227                master_map,
1228                l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1229                l1_opt: L1ClientOptions {
1230                    stake_table_update_interval: Duration::from_secs(5),
1231                    l1_events_max_block_range: 1000,
1232                    l1_polling_interval: Duration::from_secs(1),
1233                    subscription_timeout: Duration::from_secs(5),
1234                    ..Default::default()
1235                },
1236                anvil_provider: Some(anvil_provider),
1237                signer,
1238                state_relay_url: None,
1239                builder_port: None,
1240                upgrades: Default::default(),
1241            }
1242        }
1243    }
1244
1245    #[derive(Clone)]
1246    pub struct TestConfig<const NUM_NODES: usize> {
1247        config: HotShotConfig<SeqTypes>,
1248        priv_keys: Vec<BLSPrivKey>,
1249        state_key_pairs: Vec<StateKeyPair>,
1250        master_map: Arc<MasterMap<PubKey>>,
1251        l1_url: Url,
1252        l1_opt: L1ClientOptions,
1253        anvil_provider: Option<AnvilFillProvider>,
1254        signer: LocalSigner<SigningKey>,
1255        state_relay_url: Option<Url>,
1256        builder_port: Option<u16>,
1257        upgrades: BTreeMap<Version, Upgrade>,
1258    }
1259
1260    impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1261        pub fn num_nodes(&self) -> usize {
1262            self.priv_keys.len()
1263        }
1264
1265        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1266            &self.config
1267        }
1268
1269        pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1270            self.config.builder_urls = builder_urls;
1271        }
1272
1273        pub fn builder_port(&self) -> Option<u16> {
1274            self.builder_port
1275        }
1276
1277        pub fn signer(&self) -> LocalSigner<SigningKey> {
1278            self.signer.clone()
1279        }
1280
1281        pub fn l1_url(&self) -> Url {
1282            self.l1_url.clone()
1283        }
1284
1285        pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1286            self.anvil_provider.as_ref()
1287        }
1288
1289        pub fn get_upgrade_map(&self) -> UpgradeMap {
1290            self.upgrades.clone().into()
1291        }
1292
1293        pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1294            self.upgrades.clone()
1295        }
1296
1297        pub fn staking_priv_keys(&self) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
1298            staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1299        }
1300
1301        pub fn validator_providers(
1302            &self,
1303        ) -> Vec<(Address, impl Provider + Clone + use<NUM_NODES>)> {
1304            self.staking_priv_keys()
1305                .into_iter()
1306                .map(|(signer, ..)| {
1307                    (
1308                        signer.address(),
1309                        ProviderBuilder::new()
1310                            .wallet(EthereumWallet::from(signer))
1311                            .connect_http(self.l1_url.clone()),
1312                    )
1313                })
1314                .collect()
1315        }
1316
1317        pub async fn init_nodes(
1318            &self,
1319            upgrade: versions::Upgrade,
1320        ) -> Vec<SequencerContext<network::Memory, NoStorage>> {
1321            join_all((0..self.num_nodes()).map(|i| async move {
1322                self.init_node(
1323                    i,
1324                    ValidatedState::default(),
1325                    no_storage::Options,
1326                    Some(NullStateCatchup::default()),
1327                    None,
1328                    &NoMetrics,
1329                    STAKE_TABLE_CAPACITY_FOR_TEST,
1330                    NullEventConsumer,
1331                    upgrade,
1332                    Default::default(),
1333                )
1334                .await
1335            }))
1336            .await
1337        }
1338
1339        pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1340            &self.config.known_nodes_with_stake
1341        }
1342
1343        #[allow(clippy::too_many_arguments)]
1344        pub async fn init_node<P: PersistenceOptions>(
1345            &self,
1346            i: usize,
1347            mut state: ValidatedState,
1348            mut persistence_opt: P,
1349            state_peers: Option<impl StateCatchup + 'static>,
1350            storage: Option<RequestResponseStorage>,
1351            metrics: &dyn Metrics,
1352            stake_table_capacity: usize,
1353            event_consumer: impl EventConsumer + 'static,
1354            upgrade: versions::Upgrade,
1355            upgrades: BTreeMap<Version, Upgrade>,
1356        ) -> SequencerContext<network::Memory, P::Persistence> {
1357            let config = self.config.clone();
1358            let my_peer_config = &config.known_nodes_with_stake[i];
1359            let is_da = config.known_da_nodes.contains(my_peer_config);
1360
1361            // Create our own (private, local) validator config
1362            let validator_config = ValidatorConfig {
1363                public_key: my_peer_config.stake_table_entry.stake_key,
1364                private_key: self.priv_keys[i].clone(),
1365                stake_value: my_peer_config.stake_table_entry.stake_amount,
1366                state_public_key: self.state_key_pairs[i].ver_key(),
1367                state_private_key: self.state_key_pairs[i].sign_key(),
1368                is_da,
1369                x25519_keypair: None,
1370                p2p_addr: None,
1371            };
1372
1373            let topics = if is_da {
1374                vec![Topic::Global, Topic::Da]
1375            } else {
1376                vec![Topic::Global]
1377            };
1378
1379            let network = Arc::new(MemoryNetwork::new(
1380                &my_peer_config.stake_table_entry.stake_key,
1381                &self.master_map,
1382                &topics,
1383                None,
1384            ));
1385
1386            // Make sure the builder account is funded.
1387            let builder_account = Self::builder_key().fee_account();
1388            tracing::info!(%builder_account, "prefunding builder account");
1389            state.prefund_account(builder_account, U256::MAX.into());
1390
1391            let persistence = persistence_opt.create().await.unwrap();
1392
1393            let chain_config = state.chain_config.resolve().unwrap_or_default();
1394
1395            // Create an empty list of catchup providers
1396            let catchup_providers = ParallelStateCatchup::new(&[], Duration::from_secs(5));
1397
1398            // If we have the state peers, add them
1399            if let Some(state_peers) = state_peers {
1400                catchup_providers.add_provider(Arc::new(state_peers));
1401            }
1402
1403            // If we have a working local catchup provider, add it
1404            match persistence
1405                .clone()
1406                .into_catchup_provider(BackoffParams::default())
1407            {
1408                Ok(local_catchup) => {
1409                    catchup_providers.add_provider(local_catchup);
1410                },
1411                Err(e) => {
1412                    tracing::warn!(
1413                        "Failed to create local catchup provider: {e:#}. Only using remote \
1414                         catchup."
1415                    );
1416                },
1417            };
1418
1419            let l1_client = self
1420                .l1_opt
1421                .clone()
1422                .connect(vec![self.l1_url.clone()])
1423                .expect("failed to create L1 client");
1424            l1_client.spawn_tasks().await;
1425
1426            let fetcher = Fetcher::new(
1427                Arc::new(catchup_providers.clone()),
1428                Arc::new(Mutex::new(persistence.clone())),
1429                l1_client.clone(),
1430                chain_config,
1431            );
1432            fetcher.spawn_update_loop().await;
1433
1434            let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1435            let mut membership = EpochCommittees::new_stake(
1436                config.known_nodes_with_stake.clone(),
1437                config.known_da_nodes.clone(),
1438                block_reward,
1439                fetcher,
1440                config.epoch_height,
1441            );
1442            membership.reload_stake(50).await;
1443
1444            let membership = Arc::new(RwLock::new(membership));
1445            let persistence = Arc::new(persistence);
1446
1447            let coordinator = EpochMembershipCoordinator::new(
1448                membership,
1449                config.epoch_height,
1450                &persistence.clone(),
1451            );
1452
1453            let node_state = NodeState::new(
1454                i as u64,
1455                chain_config,
1456                l1_client,
1457                Arc::new(catchup_providers.clone()),
1458                upgrade.base,
1459                coordinator.clone(),
1460                upgrade.base,
1461            )
1462            .with_current_version(upgrade.base)
1463            .with_genesis(state)
1464            .with_epoch_height(config.epoch_height)
1465            .with_upgrades(upgrades)
1466            .with_epoch_start_block(config.epoch_start_block);
1467
1468            tracing::info!(
1469                i,
1470                key = %my_peer_config.stake_table_entry.stake_key,
1471                state_key = %my_peer_config.state_ver_key,
1472                "starting node",
1473            );
1474
1475            SequencerContext::init(
1476                NetworkConfig {
1477                    config,
1478                    // For testing, we use a fake network, so the rest of the network config beyond
1479                    // the base consensus config does not matter.
1480                    ..Default::default()
1481                },
1482                upgrade,
1483                validator_config,
1484                coordinator,
1485                node_state,
1486                storage,
1487                catchup_providers,
1488                persistence,
1489                network,
1490                self.state_relay_url.clone(),
1491                metrics,
1492                stake_table_capacity,
1493                event_consumer,
1494                Default::default(),
1495            )
1496            .await
1497            .unwrap()
1498        }
1499
1500        pub fn builder_key() -> EthKeyPair {
1501            FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1502        }
1503    }
1504
1505    // Wait for decide event, make sure it matches submitted transaction. Return the block number
1506    // containing the transaction and the block payload size
1507    pub async fn wait_for_decide_on_handle(
1508        events: &mut (impl Stream<Item = Event> + Unpin),
1509        submitted_txn: &Transaction,
1510    ) -> (u64, usize) {
1511        let commitment = submitted_txn.commit();
1512
1513        // Keep getting events until we see a Decide event
1514        loop {
1515            let event = events.next().await.unwrap();
1516            tracing::info!("Received event from handle: {event:?}");
1517
1518            if let Decide { leaf_chain, .. } = event.event {
1519                if let Some((height, size)) =
1520                    leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1521                        if leaf
1522                            .block_payload()
1523                            .as_ref()?
1524                            .transaction_commitments(leaf.block_header().metadata())
1525                            .contains(&commitment)
1526                        {
1527                            let size = leaf.block_payload().unwrap().encode().len();
1528                            Some((leaf.block_header().block_number(), size))
1529                        } else {
1530                            None
1531                        }
1532                    })
1533                {
1534                    tracing::info!(height, "transaction {commitment} sequenced");
1535                    return (height, size);
1536                }
1537            } else {
1538                // Keep waiting
1539            }
1540        }
1541    }
1542
1543    /// Waits until a node has reached the given target epoch (exclusive).
1544    /// The function returns once the first event indicates an epoch higher than `target_epoch`.
1545    pub async fn wait_for_epochs(
1546        events: &mut (
1547                 impl futures::Stream<Item = hotshot_types::event::Event<SeqTypes>> + std::marker::Unpin
1548             ),
1549        epoch_height: u64,
1550        target_epoch: u64,
1551    ) {
1552        tracing::info!(target_epoch, "waiting for epoch");
1553        while let Some(event) = events.next().await {
1554            if let EventType::Decide { leaf_chain, .. } = event.event {
1555                let leaf = leaf_chain[0].leaf.clone();
1556                let epoch = leaf.epoch(epoch_height);
1557                tracing::debug!(
1558                    "Node decided at height: {}, epoch: {epoch:?}",
1559                    leaf.height(),
1560                );
1561
1562                if epoch > Some(EpochNumber::new(target_epoch)) {
1563                    break;
1564                }
1565            }
1566        }
1567        tracing::info!(target_epoch, "epoch started");
1568    }
1569}
1570
1571#[cfg(test)]
1572mod test {
1573    use alloy::node_bindings::Anvil;
1574    use espresso_types::{Header, MOCK_SEQUENCER_VERSIONS, NamespaceId, Payload, Transaction};
1575    use futures::StreamExt;
1576    use hotshot::types::EventType::Decide;
1577    use hotshot_example_types::node_types::TEST_VERSIONS;
1578    use hotshot_types::{
1579        event::LeafInfo,
1580        traits::block_contents::{BlockHeader, BlockPayload},
1581    };
1582    use testing::{TestConfigBuilder, wait_for_decide_on_handle};
1583
1584    use self::testing::run_test_builder;
1585    use super::*;
1586
1587    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1588    async fn test_skeleton_instantiation() {
1589        // Assign `config` so it isn't dropped early.
1590        let anvil = Anvil::new().spawn();
1591        let url = anvil.endpoint_url();
1592        const NUM_NODES: usize = 5;
1593        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1594            .l1_url(url)
1595            .build();
1596
1597        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1598
1599        config.set_builder_urls(vec1::vec1![builder_url]);
1600
1601        let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1602
1603        let handle_0 = &handles[0];
1604
1605        // Hook the builder up to the event stream from the first node
1606        builder_task.start(Box::new(handle_0.event_stream().await));
1607
1608        let mut events = handle_0.event_stream().await;
1609
1610        for handle in handles.iter() {
1611            handle.start_consensus().await;
1612        }
1613
1614        // Submit target transaction to handle
1615        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1616        handles[0]
1617            .submit_transaction(txn.clone())
1618            .await
1619            .expect("Failed to submit transaction");
1620        tracing::info!("Submitted transaction to handle: {txn:?}");
1621
1622        wait_for_decide_on_handle(&mut events, &txn).await;
1623    }
1624
1625    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1626    async fn test_header_invariants() {
1627        let success_height = 30;
1628        // Assign `config` so it isn't dropped early.
1629        let anvil = Anvil::new().spawn();
1630        let url = anvil.endpoint_url();
1631        const NUM_NODES: usize = 5;
1632        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1633            .l1_url(url)
1634            .build();
1635
1636        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1637
1638        config.set_builder_urls(vec1::vec1![builder_url]);
1639        let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1640
1641        let handle_0 = &handles[0];
1642
1643        let mut events = handle_0.event_stream().await;
1644
1645        // Hook the builder up to the event stream from the first node
1646        builder_task.start(Box::new(handle_0.event_stream().await));
1647
1648        for handle in handles.iter() {
1649            handle.start_consensus().await;
1650        }
1651
1652        let mut parent = {
1653            // TODO refactor repeated code from other tests
1654            let (genesis_payload, genesis_ns_table) =
1655                Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1656                    .await
1657                    .unwrap();
1658
1659            let genesis_state = NodeState::mock();
1660            Header::genesis(
1661                &genesis_state,
1662                genesis_payload,
1663                &genesis_ns_table,
1664                TEST_VERSIONS.test.base,
1665            )
1666        };
1667
1668        loop {
1669            let event = events.next().await.unwrap();
1670            tracing::info!("Received event from handle: {event:?}");
1671            let Decide { leaf_chain, .. } = event.event else {
1672                continue;
1673            };
1674            tracing::info!("Got decide {leaf_chain:?}");
1675
1676            // Check that each successive header satisfies invariants relative to its parent: all
1677            // the fields which should be monotonic are.
1678            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1679                let header = leaf.block_header().clone();
1680                if header.height() == 0 {
1681                    parent = header;
1682                    continue;
1683                }
1684                assert_eq!(header.height(), parent.height() + 1);
1685                assert!(header.timestamp() >= parent.timestamp());
1686                assert!(header.l1_head() >= parent.l1_head());
1687                assert!(header.l1_finalized() >= parent.l1_finalized());
1688                parent = header;
1689            }
1690
1691            if parent.height() >= success_height {
1692                break;
1693            }
1694        }
1695    }
1696}