Skip to main content

espresso_node/
lib.rs

1mod external_event_handler;
2mod message_compat_tests;
3mod proposal_fetcher;
4mod request_response;
5mod startup_catchup;
6
7pub mod api;
8pub mod catchup;
9pub mod consensus_handle;
10pub mod context;
11pub mod genesis;
12pub use espresso_keyset as keyset;
13pub mod network;
14pub mod options;
15pub mod persistence;
16pub mod run;
17pub mod state;
18pub mod state_cert;
19pub mod state_signature;
20pub mod util;
21
22use std::{fmt::Debug, marker::PhantomData, num::NonZeroU64, sync::Arc, time::Duration};
23
24use alloy::primitives::U256;
25use anyhow::Context;
26use async_lock::Mutex;
27use catchup::{ParallelStateCatchup, StatePeers};
28use context::SequencerContext;
29use derivative::Derivative;
30use dyn_clone::clone_box;
31use espresso_types::{
32    BackoffParams, EpochCommittees, EpochRewardsCalculator, L1ClientOptions, NodeState, PubKey,
33    SeqTypes, ValidatedState,
34    traits::{EventConsumer, MembershipPersistence},
35    v0::traits::SequencerPersistence,
36    v0_3::Fetcher,
37};
38pub use genesis::Genesis;
39use genesis::L1Finalized;
40use hotshot::{
41    traits::implementations::{
42        CdnMetricsValue, CdnTopic, CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork,
43        MemoryNetwork, PushCdnNetwork, RequestResponseConfig, WrappedSignatureKey,
44        derive_libp2p_multiaddr, derive_libp2p_peer_id,
45    },
46    types::SignatureKey,
47};
48use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
49use hotshot_new_protocol::network::cliquenet::Cliquenet;
50use hotshot_orchestrator::client::{OrchestratorClient, get_complete_config};
51use hotshot_types::{
52    ValidatorConfig,
53    addr::NetAddr,
54    data::ViewNumber,
55    epoch_membership::EpochMembershipCoordinator,
56    light_client::{StateKeyPair, StateSignKey},
57    message::UpgradeLock,
58    signature_key::{BLSPrivKey, BLSPubKey},
59    traits::{
60        metrics::{Metrics, NoMetrics},
61        network::ConnectedNetwork,
62        node_implementation::{NodeImplementation, NodeType},
63        storage::Storage,
64    },
65    utils::BuilderCommitment,
66    x25519,
67};
68use libp2p::Multiaddr;
69use moka::future::Cache;
70use network::libp2p::split_off_peer_id;
71use options::Identity;
72pub use options::Options;
73use proposal_fetcher::ProposalFetcherConfig;
74pub use run::main;
75use serde::{Deserialize, Serialize};
76use tokio::select;
77use tracing::info;
78use url::Url;
79use vbs::version::StaticVersion;
80
81use crate::request_response::data_source::Storage as RequestResponseStorage;
82
83pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
84
85/// The Sequencer node is generic over the hotshot CommChannel.
86#[derive(Derivative, Serialize, Deserialize)]
87#[derivative(
88    Copy(bound = ""),
89    Debug(bound = ""),
90    Default(bound = ""),
91    PartialEq(bound = ""),
92    Eq(bound = ""),
93    Hash(bound = "")
94)]
95pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
96
97// Using derivative to derive Clone triggers the clippy lint
98// https://rust-lang.github.io/rust-clippy/master/index.html#/incorrect_clone_impl_on_copy_type
99impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
100    fn clone(&self) -> Self {
101        *self
102    }
103}
104
105pub type SequencerApiVersion = StaticVersion<0, 1>;
106
107impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
108    for Node<N, P>
109{
110    type Network = N;
111    type Storage = Arc<P>;
112}
113
114#[derive(Clone, Debug)]
115pub struct NetworkParams {
116    /// The address where a CDN marshal is located
117    pub cdn_endpoint: String,
118    pub orchestrator_url: Url,
119    pub state_relay_server_url: Url,
120
121    /// The URLs of the builders to use for submitting transactions
122    pub builder_urls: Vec<Url>,
123
124    pub private_staking_key: BLSPrivKey,
125    pub private_state_key: StateSignKey,
126    pub state_peers: Vec<Url>,
127    pub config_peers: Option<Vec<Url>>,
128    pub catchup_backoff: BackoffParams,
129    /// Base timeout for catchup requests to peers.
130    pub catchup_base_timeout: Duration,
131    /// Timeout for local catchup provider requests.
132    pub local_catchup_timeout: Duration,
133    /// Per-step timeout for the startup stake-table catchup walk
134    /// (`bootstrap_epoch_window`).
135    pub bootstrap_epoch_catchup_timeout: Duration,
136    /// Number of blocks between new-protocol consensus garbage collection passes.
137    pub new_protocol_consensus_gc_interval: NonZeroU64,
138    /// The address to advertise as our public API's URL
139    pub public_api_url: Option<Url>,
140    /// Cliquenet network address.
141    pub cliquenet_bind_addr: NetAddr,
142    /// Cliquenet address to advertise to other nodes (registered in the stake table).
143    pub cliquenet_advertise_addr: Option<NetAddr>,
144    /// X25519 secret key.
145    pub x25519_secret_key: x25519::SecretKey,
146    /// The address to send to other Libp2p nodes to contact us. Required for orchestrator
147    /// bootstrap; optional otherwise. When set, it is added to the swarm as an external address
148    /// so peers can reach us behind NAT.
149    pub libp2p_advertise_address: Option<String>,
150    /// The address to bind to for Libp2p
151    pub libp2p_bind_address: String,
152    /// The (optional) bootstrap node addresses for Libp2p. If supplied, these will
153    /// override the bootstrap nodes specified in the config file.
154    pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
155
156    /// The heartbeat interval
157    pub libp2p_heartbeat_interval: Duration,
158
159    /// The number of past heartbeats to gossip about
160    pub libp2p_history_gossip: usize,
161    /// The number of past heartbeats to remember the full messages for
162    pub libp2p_history_length: usize,
163
164    /// The target number of peers in the mesh
165    pub libp2p_mesh_n: usize,
166    /// The maximum number of peers in the mesh
167    pub libp2p_mesh_n_high: usize,
168    /// The minimum number of peers in the mesh
169    pub libp2p_mesh_n_low: usize,
170    /// The minimum number of mesh peers that must be outbound
171    pub libp2p_mesh_outbound_min: usize,
172
173    /// The maximum gossip message size
174    pub libp2p_max_gossip_transmit_size: usize,
175
176    /// The maximum direct message size
177    pub libp2p_max_direct_transmit_size: u64,
178
179    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
180    pub libp2p_max_ihave_length: usize,
181
182    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
183    pub libp2p_max_ihave_messages: usize,
184
185    /// The time period that message hashes are stored in the cache
186    pub libp2p_published_message_ids_cache_time: Duration,
187
188    /// The time to wait for a Libp2p message requested through IWANT following an IHAVE advertisement
189    pub libp2p_iwant_followup_time: Duration,
190
191    /// The maximum number of Libp2p messages we will process in a given RPC
192    pub libp2p_max_messages_per_rpc: Option<usize>,
193
194    /// How many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them
195    pub libp2p_gossip_retransmission: u32,
196
197    /// If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score
198    pub libp2p_flood_publish: bool,
199
200    /// The time period that Libp2p message hashes are stored in the cache
201    pub libp2p_duplicate_cache_time: Duration,
202
203    /// Time to live for Libp2p fanout peers
204    pub libp2p_fanout_ttl: Duration,
205
206    /// Initial delay in each Libp2p heartbeat
207    pub libp2p_heartbeat_initial_delay: Duration,
208
209    /// How many Libp2p peers we will emit gossip to at each heartbeat
210    pub libp2p_gossip_factor: f64,
211
212    /// Minimum number of Libp2p peers to emit gossip to during a heartbeat
213    pub libp2p_gossip_lazy: usize,
214}
215
216pub struct L1Params {
217    pub urls: Vec<Url>,
218    pub options: L1ClientOptions,
219}
220
221#[allow(clippy::too_many_arguments)]
222pub async fn init_node<P>(
223    genesis: Genesis,
224    network_params: NetworkParams,
225    metrics: Box<dyn Metrics>,
226    mut persistence: P,
227    l1_params: L1Params,
228    storage: Option<RequestResponseStorage>,
229    event_consumer: impl EventConsumer + 'static,
230    is_da: bool,
231    identity: Identity,
232    proposal_fetcher_config: ProposalFetcherConfig,
233) -> anyhow::Result<SequencerContext<network::Production, P>>
234where
235    P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
236    Arc<P>: Storage<SeqTypes>,
237{
238    // Expose git information via status API.
239    let info = espresso_utils::build_info!();
240    metrics
241        .text_family(
242            "version".into(),
243            vec!["rev".into(), "desc".into(), "timestamp".into()],
244        )
245        .create(vec![
246            info.git_sha.into(),
247            info.git_describe.into(),
248            info.git_commit_timestamp.into(),
249        ]);
250
251    metrics
252        .text_family(
253            "build_info".into(),
254            vec![
255                "modified".into(),
256                "branch".into(),
257                "debug".into(),
258                "features".into(),
259            ],
260        )
261        .create(vec![
262            info.git_dirty.into(),
263            info.git_branch.into(),
264            info.is_debug.to_string(),
265            env!("VERGEN_CARGO_FEATURES").into(),
266        ]);
267
268    // Expose Node Entity Information via the status/metrics API
269    metrics
270        .text_family(
271            "node_identity_general".into(),
272            vec![
273                "name".into(),
274                "description".into(),
275                "company_name".into(),
276                "company_website".into(),
277                "operating_system".into(),
278                "node_type".into(),
279                "network_type".into(),
280            ],
281        )
282        .create(vec![
283            identity.node_name.unwrap_or_default(),
284            identity.node_description.unwrap_or_default(),
285            identity.company_name.unwrap_or_default(),
286            identity
287                .company_website
288                .map(|u| u.into())
289                .unwrap_or_default(),
290            identity.operating_system.unwrap_or_default(),
291            identity.node_type.unwrap_or_default(),
292            identity.network_type.unwrap_or_default(),
293        ]);
294
295    // Expose Node Identity Location via the status/metrics API
296    metrics
297        .text_family(
298            "node_identity_location".into(),
299            vec!["country".into(), "latitude".into(), "longitude".into()],
300        )
301        .create(vec![
302            identity.country_code.unwrap_or_default(),
303            identity.latitude.map(|l| l.to_string()).unwrap_or_default(),
304            identity
305                .longitude
306                .map(|l| l.to_string())
307                .unwrap_or_default(),
308        ]);
309
310    // Expose icons for node dashboard via the status/metrics API
311    metrics
312        .text_family(
313            "node_identity_icon".into(),
314            vec![
315                "small_1x".into(),
316                "small_2x".into(),
317                "small_3x".into(),
318                "large_1x".into(),
319                "large_2x".into(),
320                "large_3x".into(),
321            ],
322        )
323        .create(vec![
324            identity
325                .icon_14x14_1x
326                .map(|u| u.to_string())
327                .unwrap_or_default(),
328            identity
329                .icon_14x14_2x
330                .map(|u| u.to_string())
331                .unwrap_or_default(),
332            identity
333                .icon_14x14_3x
334                .map(|u| u.to_string())
335                .unwrap_or_default(),
336            identity
337                .icon_24x24_1x
338                .map(|u| u.to_string())
339                .unwrap_or_default(),
340            identity
341                .icon_24x24_2x
342                .map(|u| u.to_string())
343                .unwrap_or_default(),
344            identity
345                .icon_24x24_3x
346                .map(|u| u.to_string())
347                .unwrap_or_default(),
348        ]);
349
350    // Stick our public key in `metrics` so it is easily accessible via the status API.
351    let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
352    metrics
353        .text_family("node".into(), vec!["key".into()])
354        .create(vec![pub_key.to_string()]);
355
356    // Parse the Libp2p bind and advertise addresses to multiaddresses
357    let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
358        .with_context(|| {
359            format!(
360                "Failed to derive Libp2p bind address of {}",
361                &network_params.libp2p_bind_address
362            )
363        })?;
364    let advertise_multiaddr = network_params
365        .libp2p_advertise_address
366        .as_ref()
367        .map(|addr| {
368            derive_libp2p_multiaddr(addr)
369                .with_context(|| format!("Failed to derive Libp2p advertise address of {addr}"))
370        })
371        .transpose()?;
372    let advertise_is_global = match network_params
373        .libp2p_advertise_address
374        .as_deref()
375        .and_then(|s| s.parse::<NetAddr>().ok())
376    {
377        Some(parsed) if !parsed.is_probably_global() => {
378            tracing::error!(
379                "Libp2p advertise address {parsed} is probably not publicly routable. This is \
380                 fine for local testing (demo-native, docker-compose) but is wrong for any real \
381                 deployment: remote peers will fail to dial us."
382            );
383            false
384        },
385        _ => true,
386    };
387
388    // Always pass the configured address to the orchestrator stake table; that path is
389    // testing-only and demo-native legitimately uses loopback.
390    let libp2p_announce_addresses: Vec<Multiaddr> = advertise_multiaddr.iter().cloned().collect();
391
392    // Only register the advertise address as a libp2p `external_address` when it looks
393    // publicly routable: announcing local/private values via Identify / Kademlia poisons peer
394    // routing tables in production. Local tests don't need it since peers find each other via
395    // `libp2p_bootstrap_nodes`.
396    let libp2p_external_addresses: Vec<Multiaddr> = if advertise_is_global {
397        advertise_multiaddr.iter().cloned().collect()
398    } else {
399        Vec::new()
400    };
401
402    info!("Libp2p bind address: {}", libp2p_bind_address);
403    info!("Libp2p announce addresses: {:?}", libp2p_announce_addresses);
404    info!("Libp2p external addresses: {:?}", libp2p_external_addresses);
405
406    // Orchestrator client
407    let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
408    let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
409
410    // Only the orchestrator bootstrap path publishes these into the stake table; overridden
411    // below when needed.
412    let mut validator_config = ValidatorConfig {
413        public_key: pub_key,
414        private_key: network_params.private_staking_key,
415        stake_value: U256::ONE,
416        state_public_key: state_key_pair.ver_key(),
417        state_private_key: state_key_pair.sign_key(),
418        is_da,
419        x25519_keypair: None,
420        p2p_addr: None,
421    };
422
423    // Derive our Libp2p public key from our private key
424    let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
425        &validator_config.private_key,
426    )
427    .with_context(|| "Failed to derive Libp2p peer ID")?;
428
429    // Print the libp2p public key
430    info!("Starting Libp2p with PeerID: {libp2p_public_key}");
431
432    let loaded_network_config_from_persistence = persistence.load_config().await?;
433    let (mut network_config, wait_for_orchestrator, persist_config) = match (
434        loaded_network_config_from_persistence,
435        network_params.config_peers,
436    ) {
437        (Some(config), _) => {
438            tracing::warn!("loaded network config from storage, rejoining existing network");
439            (config, false, false)
440        },
441        // If we were told to fetch the config from an already-started peer, do so.
442        (None, Some(peers)) => {
443            tracing::warn!(?peers, "loading network config from peers");
444            let peers = StatePeers::<SequencerApiVersion>::from_urls(
445                peers,
446                network_params.catchup_backoff,
447                network_params.catchup_base_timeout,
448                &NoMetrics,
449            );
450            let config = peers.fetch_config(validator_config.clone()).await?;
451
452            tracing::warn!(
453                node_id = config.node_index,
454                stake_table = ?config.config.known_nodes_with_stake,
455                "loaded config",
456            );
457            (config, false, true)
458        },
459        // Otherwise, this is a fresh network; load from the orchestrator.
460        (None, None) => {
461            tracing::warn!("loading network config from orchestrator");
462            tracing::warn!(
463                "waiting for other nodes to connect, DO NOT RESTART until fully connected"
464            );
465
466            // Publish our cliquenet `connect_info` into the stake table from
467            // `NEW_PROTOCOL_VERSION` on, so peers can dial us. Modify `validator_config`
468            // in place so the same `connect_info` is sent later when posting to
469            // `/ready` (the orchestrator equality-checks against `known_nodes_with_stake`).
470            if genesis.base_version >= versions::NEW_PROTOCOL_VERSION {
471                let advertise_addr = network_params.cliquenet_advertise_addr.clone().context(
472                    "ESPRESSO_NODE_CLIQUENET_ADVERTISE_ADDRESS must be set when bootstrapping a \
473                     Cliquenet network from the orchestrator",
474                )?;
475                validator_config.x25519_keypair =
476                    Some(x25519::Keypair::from(&network_params.x25519_secret_key));
477                validator_config.p2p_addr = Some(advertise_addr);
478            }
479
480            let bootstrap_advertise_addr = libp2p_announce_addresses.first().cloned().context(
481                "ESPRESSO_NODE_LIBP2P_ADVERTISE_ADDRESS must be set when bootstrapping a libp2p \
482                 network from the orchestrator",
483            )?;
484
485            let config = get_complete_config(
486                &orchestrator_client,
487                validator_config.clone(),
488                // Register in our Libp2p advertise address and public key so other nodes
489                // can contact us on startup
490                Some(bootstrap_advertise_addr),
491                Some(libp2p_public_key),
492            )
493            .await?
494            .0;
495
496            tracing::warn!(
497                node_id = config.node_index,
498                stake_table = ?config.config.known_nodes_with_stake,
499                "loaded config",
500            );
501            tracing::warn!("all nodes connected");
502            (config, true, true)
503        },
504    };
505
506    if let Some(upgrade) = genesis.upgrades.get(&genesis.upgrade_version) {
507        upgrade.set_hotshot_config_parameters(&mut network_config.config);
508    }
509
510    // Override the builder URLs in the network config with the ones from the command line
511    // if any were provided
512    if !network_params.builder_urls.is_empty() {
513        network_config.config.builder_urls = network_params.builder_urls.try_into().unwrap();
514    }
515
516    let epoch_height = genesis.epoch_height.unwrap_or_default();
517    let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
518    let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
519    let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
520    let stake_table_capacity = genesis
521        .stake_table_capacity
522        .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
523
524    let version_upgrade = versions::Upgrade::new(genesis.base_version, genesis.upgrade_version);
525
526    tracing::warn!("setting epoch_height={epoch_height:?}");
527    tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
528    tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
529    tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
530    tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
531    tracing::warn!("setting version_upgrade={version_upgrade}");
532    network_config.config.epoch_height = epoch_height;
533    network_config.config.drb_difficulty = drb_difficulty;
534    network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
535    network_config.config.epoch_start_block = epoch_start_block;
536    network_config.config.stake_table_capacity = stake_table_capacity;
537
538    if let Some(da_committees) = &genesis.da_committees {
539        tracing::warn!("setting da_committees from genesis: {da_committees:?}");
540        network_config.config.da_committees = da_committees.clone();
541    }
542
543    // Save *after* the above updates. The orchestrator and peer fetched configs don't include
544    // epoch_height, drb_difficulty, etc. those come from genesis and are applied above.
545    // Saving before these updates would persist zeros for those values
546    if persist_config {
547        persistence.save_config(&network_config).await?;
548    }
549
550    // If the `Libp2p` bootstrap nodes were supplied via the command line, override those
551    // present in the config file.
552    if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
553        if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
554            // If the libp2p configuration is present, we can override the bootstrap nodes.
555
556            // Split off the peer ID from the addresses
557            libp2p_config.bootstrap_nodes = bootstrap_nodes
558                .into_iter()
559                .map(split_off_peer_id)
560                .collect::<Result<Vec<_>, _>>()
561                .with_context(|| "Failed to parse peer ID from bootstrap node")?;
562        } else {
563            // If not, don't try launching with them. Eventually we may want to
564            // provide a default configuration here instead.
565            tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
566        }
567    }
568
569    let node_index = network_config.node_index;
570
571    // If we are a DA node, we need to subscribe to the DA topic
572    let topics = {
573        let mut topics = vec![CdnTopic::Global];
574        if is_da {
575            topics.push(CdnTopic::Da);
576        }
577        topics
578    };
579
580    // Initialize the push CDN network (and perform the initial connection)
581    let cdn_network = PushCdnNetwork::new(
582        network_params.cdn_endpoint,
583        topics,
584        KeyPair {
585            public_key: WrappedSignatureKey(validator_config.public_key),
586            private_key: validator_config.private_key.clone(),
587        },
588        CdnMetricsValue::new(&*metrics),
589    )
590    .with_context(|| format!("Failed to create CDN network {node_index}"))?;
591
592    // Configure gossipsub based on the command line options
593    let gossip_config = GossipConfig {
594        heartbeat_interval: network_params.libp2p_heartbeat_interval,
595        history_gossip: network_params.libp2p_history_gossip,
596        history_length: network_params.libp2p_history_length,
597        mesh_n: network_params.libp2p_mesh_n,
598        mesh_n_high: network_params.libp2p_mesh_n_high,
599        mesh_n_low: network_params.libp2p_mesh_n_low,
600        mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
601        max_ihave_messages: network_params.libp2p_max_ihave_messages,
602        max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
603        max_ihave_length: network_params.libp2p_max_ihave_length,
604        published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
605        iwant_followup_time: network_params.libp2p_iwant_followup_time,
606        max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
607        gossip_retransmission: network_params.libp2p_gossip_retransmission,
608        flood_publish: network_params.libp2p_flood_publish,
609        duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
610        fanout_ttl: network_params.libp2p_fanout_ttl,
611        heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
612        gossip_factor: network_params.libp2p_gossip_factor,
613        gossip_lazy: network_params.libp2p_gossip_lazy,
614    };
615
616    // Configure request/response based on the command line options
617    let request_response_config = RequestResponseConfig {
618        request_size_maximum: network_params.libp2p_max_direct_transmit_size,
619        response_size_maximum: network_params.libp2p_max_direct_transmit_size,
620    };
621
622    let l1_client = l1_params
623        .options
624        .with_metrics(&*metrics)
625        .connect(l1_params.urls)
626        .with_context(|| "failed to create L1 client")?;
627
628    info!("Validating fee contract");
629
630    genesis.validate_fee_contract(&l1_client).await?;
631
632    info!("Fee contract validated. Spawning L1 tasks");
633
634    l1_client.spawn_tasks().await;
635
636    info!(
637        "L1 tasks spawned. Waiting for L1 genesis: {:?}",
638        genesis.l1_finalized
639    );
640
641    let l1_genesis = match genesis.l1_finalized {
642        L1Finalized::Block(b) => b,
643        L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
644        L1Finalized::Timestamp { timestamp } => {
645            l1_client
646                .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
647                .await
648        },
649    };
650
651    info!("L1 genesis found: {:?}", l1_genesis);
652
653    let genesis_chain_config = genesis.header.chain_config;
654    let mut genesis_state = ValidatedState {
655        chain_config: genesis_chain_config.into(),
656        ..Default::default()
657    };
658    for (address, amount) in genesis.accounts {
659        tracing::warn!(%address, %amount, "Prefunding account for demo");
660        genesis_state.prefund_account(address, amount);
661    }
662
663    // Create the list of parallel catchup providers
664    let state_catchup_providers =
665        ParallelStateCatchup::new(&[], network_params.local_catchup_timeout);
666
667    // Add the state peers to the list
668    let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
669        network_params.state_peers,
670        network_params.catchup_backoff,
671        network_params.catchup_base_timeout,
672        &*metrics,
673    );
674    state_catchup_providers.add_provider(Arc::new(state_peers));
675
676    // Add the local (persistence) catchup provider to the list (if we can)
677    match persistence
678        .clone()
679        .into_catchup_provider(network_params.catchup_backoff)
680    {
681        Ok(catchup) => {
682            state_catchup_providers.add_provider(Arc::new(catchup));
683        },
684        Err(e) => {
685            tracing::warn!(
686                "Failed to create local catchup provider: {e:#}. Only using remote catchup."
687            );
688        },
689    };
690
691    persistence.enable_metrics(&*metrics);
692
693    let fetcher = Fetcher::new(
694        Arc::new(state_catchup_providers.clone()),
695        Arc::new(Mutex::new(persistence.clone())),
696        l1_client.clone(),
697        genesis.chain_config,
698    );
699
700    info!("Spawning update loop");
701
702    fetcher.spawn_update_loop().await;
703    info!("Update loop spawned. Fetching block reward");
704
705    let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
706    info!("Block reward fetched: {:?}", block_reward);
707    // Create the HotShot membership
708    let mut membership = EpochCommittees::new_stake(
709        network_config.config.known_nodes_with_stake.clone(),
710        network_config.config.known_da_nodes.clone(),
711        block_reward,
712        fetcher,
713        epoch_height,
714    );
715    info!("Membership created. Reloading stake");
716    membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
717    info!("Stake reloaded");
718
719    check_cliquenet_info_registered(
720        &membership,
721        &validator_config.public_key,
722        genesis.base_version,
723        genesis.chain_config.stake_table_contract,
724        &l1_client,
725    )
726    .await;
727
728    let persistence = Arc::new(persistence);
729    let coordinator = EpochMembershipCoordinator::new(
730        membership,
731        network_config.config.epoch_height,
732        &persistence,
733    );
734
735    let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
736
737    let instance_state = NodeState {
738        chain_config: genesis.chain_config,
739        genesis_chain_config,
740        l1_client,
741        genesis_header: genesis.header,
742        genesis_state,
743        l1_genesis: Some(l1_genesis),
744        node_id: node_index,
745        upgrades: genesis.upgrades,
746        current_version: genesis.base_version,
747        epoch_height: Some(epoch_height),
748        state_catchup: Arc::new(state_catchup_providers.clone()),
749        coordinator: coordinator.clone(),
750        genesis_version: genesis.genesis_version,
751        epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
752        epoch_rewards_calculator,
753        light_client_contract_address: Cache::builder().max_capacity(1).build(),
754        token_contract_address: Cache::builder().max_capacity(1).build(),
755        finalized_hotshot_height: Cache::builder()
756            .max_capacity(1)
757            .time_to_live(Duration::from_secs(30))
758            .build(),
759    };
760
761    let combined_network = {
762        info!("Initializing Libp2p network");
763        let p2p_network = Libp2pNetwork::from_config(
764            network_config.clone(),
765            persistence.clone(),
766            gossip_config,
767            request_response_config,
768            libp2p_bind_address,
769            libp2p_external_addresses,
770            &validator_config.public_key,
771            // We need the private key so we can derive our Libp2p keypair
772            // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html)
773            &validator_config.private_key,
774            hotshot::traits::implementations::Libp2pMetricsValue::new(&*metrics),
775        )
776        .await
777        .with_context(|| {
778            format!(
779                "Failed to create libp2p network on node {node_index}; binding to {:?}",
780                network_params.libp2p_bind_address
781            )
782        })?;
783
784        info!("Libp2p network initialized");
785
786        tracing::warn!("Waiting for at least one connection to be initialized");
787        select! {
788            _ = cdn_network.wait_for_ready() => {
789                tracing::warn!("CDN connection initialized");
790            },
791            _ = p2p_network.wait_for_ready() => {
792                tracing::warn!("P2P connection initialized");
793            },
794        };
795
796        // Combine the CDN and P2P networks
797        CombinedNetworks::new(cdn_network, p2p_network, Some(Duration::from_secs(1)))
798    };
799
800    // TODO: This creates a separate UpgradeLock from the one HotShot will
801    // use. They should share a single lock so upgrade certificate updates
802    // are visible to both.
803    let cliquenet = Cliquenet::create(
804        "espresso",
805        pub_key,
806        network_params.x25519_secret_key.into(),
807        network_params.cliquenet_bind_addr.clone(),
808        [],
809        UpgradeLock::new(version_upgrade),
810        clone_box(&*metrics),
811    )
812    .await?;
813
814    let network = Arc::new(combined_network);
815
816    let mut ctx = SequencerContext::init(
817        network_config,
818        version_upgrade,
819        validator_config,
820        coordinator,
821        instance_state,
822        storage,
823        state_catchup_providers,
824        persistence,
825        network.clone(),
826        cliquenet,
827        Some(network_params.state_relay_server_url),
828        &*metrics,
829        genesis.stake_table.capacity,
830        event_consumer,
831        proposal_fetcher_config,
832        network_params.bootstrap_epoch_catchup_timeout,
833        network_params.new_protocol_consensus_gc_interval,
834    )
835    .await?;
836
837    if wait_for_orchestrator {
838        ctx = ctx.wait_for_orchestrator(orchestrator_client);
839    }
840
841    Ok(ctx)
842}
843
844pub fn empty_builder_commitment() -> BuilderCommitment {
845    BuilderCommitment::from_bytes([])
846}
847
848/// On the version immediately preceding CLIQUENET, log an error if this
849/// validator has a stake-table entry without an x25519 key or p2p address.
850/// Skipped unless StakeTableV3 is deployed (otherwise the operator has no
851/// actionable path), detected via `getVersion()` on the proxy.
852async fn check_cliquenet_info_registered(
853    membership: &EpochCommittees,
854    pub_key: &BLSPubKey,
855    current_version: vbs::version::Version,
856    stake_table_contract: Option<alloy::primitives::Address>,
857    l1_client: &espresso_types::v0::L1Client,
858) {
859    if current_version != versions::EPOCH_REWARD_VERSION {
860        return;
861    }
862    let Some(addr) = stake_table_contract else {
863        return;
864    };
865    let stake_table =
866        hotshot_contract_adapter::sol_types::StakeTableV3::new(addr, l1_client.provider.clone());
867    let mut major = None;
868    let mut last_err = None;
869    for attempt in 1..=3 {
870        match stake_table.getVersion().call().await {
871            Ok(v) => {
872                major = Some(v.majorVersion);
873                break;
874            },
875            Err(e) => {
876                tracing::warn!(attempt, %e, "failed to read StakeTable getVersion(), retrying");
877                last_err = Some(e);
878                if attempt < 3 {
879                    tokio::time::sleep(Duration::from_secs(1)).await;
880                }
881            },
882        }
883    }
884    let Some(major) = major else {
885        tracing::warn!(
886            err = ?last_err,
887            "could not read StakeTable getVersion() after 3 attempts; skipping check"
888        );
889        return;
890    };
891    if major < 3 {
892        tracing::info!(
893            major,
894            "StakeTableV3 not deployed; skipping network-info registration check"
895        );
896        return;
897    }
898    let Some(cfg) = membership.latest_peer_config(pub_key) else {
899        return;
900    };
901    if cfg.connect_info.is_some() {
902        return;
903    }
904    tracing::error!(
905        bls_key = %pub_key,
906        "Validator has no x25519 key or p2p address registered on-chain. After the CLIQUENET \
907         upgrade activates, the validator will be excluded from the active set and stop earning \
908         rewards. To fix: (1) generate an x25519 keypair if needed (`keygen --scheme x25519 \
909         --out keys.env`), then (2) register it on-chain (`staking-cli update-network-config \
910         --x25519-key <ESPRESSO_NODE_PUBLIC_X25519_KEY> --p2p-addr <host:port>`)."
911    );
912}
913
914#[cfg(any(test, feature = "testing"))]
915pub mod testing {
916    use std::{
917        cmp::max,
918        collections::{BTreeMap, HashMap},
919        net::Ipv4Addr,
920        time::Duration,
921    };
922
923    use alloy::{
924        network::EthereumWallet,
925        node_bindings::{Anvil, AnvilInstance},
926        primitives::{Address, U256},
927        providers::{
928            Provider, ProviderBuilder, RootProvider,
929            fillers::{
930                BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
931            },
932            layers::AnvilProvider,
933        },
934        signers::{k256::ecdsa::SigningKey, local::LocalSigner},
935    };
936    use catchup::NullStateCatchup;
937    use committable::Committable;
938    use espresso_contract_deployer::{
939        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
940        network_config::light_client_genesis_from_stake_table,
941    };
942    use espresso_types::{
943        EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
944        Upgrade, UpgradeMap,
945        eth_signature_key::EthKeyPair,
946        v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
947    };
948    use futures::{
949        future::join_all,
950        stream::{Stream, StreamExt},
951    };
952    use hotshot::{
953        traits::{
954            BlockPayload,
955            implementations::{MasterMap, MemoryNetwork},
956        },
957        types::EventType,
958    };
959    use hotshot_builder_refactored::service::{
960        BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
961    };
962    use hotshot_testing::block_builder::{
963        BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
964    };
965    use hotshot_types::{
966        HotShotConfig, PeerConfig,
967        data::EpochNumber,
968        event::LeafInfo,
969        light_client::StateKeyPair,
970        message::UpgradeLock,
971        new_protocol::CoordinatorEvent,
972        traits::{
973            EncodeBytes, block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
974            signature_key::BuilderSignatureKey,
975        },
976    };
977    use rand::SeedableRng as _;
978    use rand_chacha::ChaCha20Rng;
979    use staking_cli::demo::{DelegationConfig, StakingKeySet, StakingTransactions};
980    use test_utils::reserve_tcp_port;
981    use tokio::spawn;
982    use vbs::version::{StaticVersionType, Version};
983    use versions::EPOCH_VERSION;
984
985    use super::*;
986    use crate::{
987        catchup::ParallelStateCatchup,
988        persistence::no_storage::{self, NoStorage},
989    };
990
991    const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
992    const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
993    type AnvilFillProvider = AnvilProvider<
994        FillProvider<
995            JoinFill<
996                alloy::providers::Identity,
997                JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
998            >,
999            RootProvider,
1000        >,
1001    >;
1002    struct LegacyBuilderImplementation {
1003        global_state: Arc<LegacyGlobalState<SeqTypes>>,
1004    }
1005
1006    impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
1007        fn start(
1008            self: Box<Self>,
1009            stream: Box<
1010                dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
1011                    + std::marker::Unpin
1012                    + Send
1013                    + 'static,
1014            >,
1015        ) {
1016            spawn(async move {
1017                let res = self.global_state.start_event_loop(stream).await;
1018                tracing::error!(?res, "testing legacy builder service exited");
1019            });
1020        }
1021    }
1022
1023    pub async fn run_legacy_builder<const NUM_NODES: usize>(
1024        port: Option<u16>,
1025        max_block_size: Option<u64>,
1026    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
1027        let builder_key_pair = TestConfig::<0>::builder_key();
1028        let port = match port {
1029            Some(p) => p,
1030            None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
1031        };
1032
1033        // This should never fail.
1034        let url: Url = format!("http://localhost:{port}")
1035            .parse()
1036            .expect("Failed to parse builder URL");
1037
1038        // create the global state
1039        let global_state = LegacyGlobalState::new(
1040            LegacyBuilderConfig {
1041                builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
1042                max_api_waiting_time: Duration::from_secs(1),
1043                max_block_size_increment_period: Duration::from_secs(60),
1044                maximize_txn_capture_timeout: Duration::from_millis(100),
1045                txn_garbage_collect_duration: Duration::from_secs(60),
1046                txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
1047                tx_status_cache_capacity: 81920,
1048                base_fee: 10,
1049            },
1050            NodeState::default(),
1051            max_block_size.unwrap_or(300),
1052            NUM_NODES,
1053        );
1054
1055        // Create and spawn the tide-disco app to serve the builder APIs
1056        let app = Arc::clone(&global_state)
1057            .into_app()
1058            .expect("Failed to create builder tide-disco app");
1059
1060        spawn(async move {
1061            app.serve(
1062                format!("http://0.0.0.0:{port}")
1063                    .parse::<Url>()
1064                    .expect("Failed to parse builder listener"),
1065                EpochVersion::instance(),
1066            )
1067            .await
1068        });
1069
1070        // Pass on the builder task to be injected in the testing harness
1071        (Box::new(LegacyBuilderImplementation { global_state }), url)
1072    }
1073
1074    pub async fn run_test_builder<const NUM_NODES: usize>(
1075        port: Option<u16>,
1076    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
1077        let port = match port {
1078            Some(p) => p,
1079            None => reserve_tcp_port().expect("OS should have ephemeral ports available"),
1080        };
1081
1082        // This should never fail.
1083        let url: Url = format!("http://localhost:{port}")
1084            .parse()
1085            .expect("Failed to parse builder URL");
1086        tracing::info!("Starting test builder on {url}");
1087
1088        let task = <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
1089            NUM_NODES,
1090            format!("http://0.0.0.0:{port}")
1091                .parse()
1092                .expect("Failed to parse builder listener"),
1093            (),
1094            HashMap::new(),
1095        )
1096        .await;
1097
1098        (task, url)
1099    }
1100
1101    pub struct TestConfigBuilder<const NUM_NODES: usize> {
1102        config: HotShotConfig<SeqTypes>,
1103        priv_keys: Vec<BLSPrivKey>,
1104        state_key_pairs: Vec<StateKeyPair>,
1105        master_map: Arc<MasterMap<PubKey>>,
1106        l1_url: Url,
1107        l1_opt: L1ClientOptions,
1108        anvil_provider: Option<AnvilFillProvider>,
1109        signer: LocalSigner<SigningKey>,
1110        state_relay_url: Option<Url>,
1111        builder_port: Option<u16>,
1112        upgrades: BTreeMap<Version, Upgrade>,
1113    }
1114
1115    pub fn staking_priv_keys(
1116        priv_keys: &[BLSPrivKey],
1117        state_key_pairs: &[StateKeyPair],
1118        num_nodes: usize,
1119    ) -> Vec<StakingKeySet> {
1120        let seed = [42u8; 32];
1121        let mut rng = ChaCha20Rng::from_seed(seed); // Create a deterministic RNG
1122        let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
1123        eth_key_pairs
1124            .zip(priv_keys.iter())
1125            .zip(state_key_pairs.iter())
1126            .map(|((eth, bls), state)| StakingKeySet {
1127                signer: eth,
1128                bls: bls.clone().into(),
1129                state: state.clone(),
1130                x25519: x25519::Keypair::generate().expect("x25519 keypair"),
1131                p2p_addr: "127.0.0.1:8080".parse().unwrap(),
1132            })
1133            .collect()
1134    }
1135
1136    impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
1137        pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
1138            self.builder_port = builder_port;
1139            self
1140        }
1141
1142        pub fn state_relay_url(mut self, url: Url) -> Self {
1143            self.state_relay_url = Some(url);
1144            self
1145        }
1146
1147        /// Sets the Anvil provider, constructed using the Anvil instance.
1148        /// Also sets the L1 URL based on the Anvil endpoint.
1149        /// The `AnvilProvider` can be used to configure the Anvil, for example,
1150        /// by enabling interval mining after the test network is initialized.
1151        pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
1152            self.l1_url = anvil.endpoint().parse().unwrap();
1153            let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
1154            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1155            self.anvil_provider = Some(anvil_provider);
1156            self
1157        }
1158
1159        /// Sets a custom L1 URL, overriding any previously set Anvil instance URL.
1160        /// This removes the anvil provider, as well as it is no longer needed
1161        pub fn l1_url(mut self, l1_url: Url) -> Self {
1162            self.anvil_provider = None;
1163            self.l1_url = l1_url;
1164            self
1165        }
1166
1167        pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
1168            self.l1_opt = opt;
1169            self
1170        }
1171
1172        pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
1173            self.signer = signer;
1174            self
1175        }
1176
1177        pub fn upgrades(mut self, v: Version, upgrades: BTreeMap<Version, Upgrade>) -> Self {
1178            let upgrade = upgrades.get(&v).unwrap();
1179            upgrade.set_hotshot_config_parameters(&mut self.config);
1180            self.upgrades = upgrades;
1181            self
1182        }
1183
1184        /// Version specific upgrade setup. Extend to future upgrades
1185        /// by adding a branch to the `match` statement.
1186        pub async fn set_upgrades(mut self, version: Version) -> Self {
1187            let upgrade = match version {
1188                version if version >= EPOCH_VERSION => {
1189                    tracing::debug!(?version, "upgrade version");
1190                    let blocks_per_epoch = self.config.epoch_height;
1191                    let epoch_start_block = self.config.epoch_start_block;
1192
1193                    let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1194                        &self.config.hotshot_stake_table(),
1195                        STAKE_TABLE_CAPACITY_FOR_TEST,
1196                    )
1197                    .unwrap();
1198
1199                    let validators =
1200                        staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
1201
1202                    let deployer = ProviderBuilder::new()
1203                        .wallet(EthereumWallet::from(self.signer.clone()))
1204                        .connect_http(self.l1_url.clone());
1205
1206                    let mut contracts = Contracts::new();
1207                    let args = DeployerArgsBuilder::default()
1208                        .deployer(deployer.clone())
1209                        .rpc_url(self.l1_url.clone())
1210                        .mock_light_client(true)
1211                        .genesis_lc_state(genesis_state)
1212                        .genesis_st_state(genesis_stake)
1213                        .blocks_per_epoch(blocks_per_epoch)
1214                        .epoch_start_block(epoch_start_block)
1215                        .exit_escrow_period(U256::from(max(
1216                            blocks_per_epoch * 15 + 100,
1217                            DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1218                        )))
1219                        .multisig_pauser(self.signer.address())
1220                        .token_name("Espresso".to_string())
1221                        .token_symbol("ESP".to_string())
1222                        .initial_token_supply(U256::from(3590000000u64))
1223                        .ops_timelock_delay(U256::from(0))
1224                        .ops_timelock_admin(self.signer.address())
1225                        .ops_timelock_proposers(vec![self.signer.address()])
1226                        .ops_timelock_executors(vec![self.signer.address()])
1227                        .safe_exit_timelock_delay(U256::from(10))
1228                        .safe_exit_timelock_admin(self.signer.address())
1229                        .safe_exit_timelock_proposers(vec![self.signer.address()])
1230                        .safe_exit_timelock_executors(vec![self.signer.address()])
1231                        .build()
1232                        .unwrap();
1233                    args.deploy_to_stake_table_v3(&mut contracts)
1234                        .await
1235                        .expect("failed to deploy all contracts");
1236
1237                    let st_addr = contracts
1238                        .address(Contract::StakeTableProxy)
1239                        .expect("StakeTableProxy address not found");
1240                    StakingTransactions::create(
1241                        self.l1_url.clone(),
1242                        &deployer,
1243                        st_addr,
1244                        validators,
1245                        None,
1246                        DelegationConfig::default(),
1247                    )
1248                    .await
1249                    .expect("stake table setup failed")
1250                    .apply_all()
1251                    .await
1252                    .expect("send all txns failed");
1253
1254                    Upgrade::pos_view_based(st_addr)
1255                },
1256                _ => panic!("Upgrade not configured for version {version:?}"),
1257            };
1258
1259            let mut upgrades = std::collections::BTreeMap::new();
1260            upgrade.set_hotshot_config_parameters(&mut self.config);
1261            upgrades.insert(version, upgrade);
1262
1263            self.upgrades = upgrades;
1264            self
1265        }
1266
1267        pub fn epoch_height(mut self, epoch_height: u64) -> Self {
1268            self.config.epoch_height = epoch_height;
1269            self
1270        }
1271
1272        pub fn epoch_start_block(mut self, start_block: u64) -> Self {
1273            self.config.epoch_start_block = start_block;
1274            self
1275        }
1276
1277        pub fn build(self) -> TestConfig<NUM_NODES> {
1278            TestConfig {
1279                config: self.config,
1280                priv_keys: self.priv_keys,
1281                state_key_pairs: self.state_key_pairs,
1282                master_map: self.master_map,
1283                l1_url: self.l1_url,
1284                l1_opt: self.l1_opt,
1285                signer: self.signer,
1286                state_relay_url: self.state_relay_url,
1287                builder_port: self.builder_port,
1288                upgrades: self.upgrades,
1289                anvil_provider: self.anvil_provider,
1290            }
1291        }
1292
1293        pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
1294            self.config.stake_table_capacity = stake_table_capacity;
1295            self
1296        }
1297    }
1298
1299    impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
1300        fn default() -> Self {
1301            let num_nodes = NUM_NODES;
1302
1303            // Generate keys for the nodes.
1304            let seed = [0; 32];
1305            let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1306                .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1307                .unzip();
1308            let state_key_pairs = (0..num_nodes)
1309                .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1310                .collect::<Vec<_>>();
1311            let known_nodes_with_stake = pub_keys
1312                .iter()
1313                .zip(&state_key_pairs)
1314                .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1315                    stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1316                    state_ver_key: state_key_pair.ver_key(),
1317                    connect_info: None,
1318                })
1319                .collect::<Vec<_>>();
1320
1321            let master_map = MasterMap::new();
1322
1323            let builder_port = reserve_tcp_port().unwrap();
1324
1325            let config: HotShotConfig<SeqTypes> = HotShotConfig {
1326                fixed_leader_for_gpuvid: 0,
1327                num_nodes_with_stake: num_nodes.try_into().unwrap(),
1328                known_da_nodes: known_nodes_with_stake.clone(),
1329                da_committees: Default::default(),
1330                known_nodes_with_stake: known_nodes_with_stake.clone(),
1331                next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1332                num_bootstrap: 1usize,
1333                da_staked_committee_size: num_nodes,
1334                view_sync_timeout: Duration::from_secs(1),
1335                data_request_delay: Duration::from_secs(1),
1336                builder_urls: vec1::vec1![
1337                    Url::parse(&format!("http://127.0.0.1:{builder_port}")).unwrap()
1338                ],
1339                builder_timeout: Duration::from_secs(1),
1340                start_threshold: (
1341                    known_nodes_with_stake.clone().len() as u64,
1342                    known_nodes_with_stake.clone().len() as u64,
1343                ),
1344                start_proposing_view: 0,
1345                stop_proposing_view: 0,
1346                start_voting_view: 0,
1347                stop_voting_view: 0,
1348                start_proposing_time: 0,
1349                start_voting_time: 0,
1350                stop_proposing_time: 0,
1351                stop_voting_time: 0,
1352                epoch_height: 30,
1353                epoch_start_block: 1,
1354                stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1355                drb_difficulty: 10,
1356                drb_upgrade_difficulty: 20,
1357            };
1358
1359            let anvil = Anvil::new()
1360                .args(["--slots-in-an-epoch", "0", "--balance", "1000000"])
1361                .spawn();
1362
1363            let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1364            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1365
1366            let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1367            let signer = LocalSigner::from(l1_signer_key);
1368
1369            Self {
1370                config,
1371                priv_keys,
1372                state_key_pairs,
1373                master_map,
1374                l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1375                l1_opt: L1ClientOptions {
1376                    stake_table_update_interval: Duration::from_secs(5),
1377                    l1_events_max_block_range: 1000,
1378                    l1_polling_interval: Duration::from_secs(1),
1379                    subscription_timeout: Duration::from_secs(5),
1380                    ..Default::default()
1381                },
1382                anvil_provider: Some(anvil_provider),
1383                signer,
1384                state_relay_url: None,
1385                builder_port: None,
1386                upgrades: Default::default(),
1387            }
1388        }
1389    }
1390
1391    #[derive(Clone)]
1392    pub struct TestConfig<const NUM_NODES: usize> {
1393        config: HotShotConfig<SeqTypes>,
1394        priv_keys: Vec<BLSPrivKey>,
1395        state_key_pairs: Vec<StateKeyPair>,
1396        master_map: Arc<MasterMap<PubKey>>,
1397        l1_url: Url,
1398        l1_opt: L1ClientOptions,
1399        anvil_provider: Option<AnvilFillProvider>,
1400        signer: LocalSigner<SigningKey>,
1401        state_relay_url: Option<Url>,
1402        builder_port: Option<u16>,
1403        upgrades: BTreeMap<Version, Upgrade>,
1404    }
1405
1406    impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1407        pub fn num_nodes(&self) -> usize {
1408            self.priv_keys.len()
1409        }
1410
1411        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1412            &self.config
1413        }
1414
1415        pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1416            self.config.builder_urls = builder_urls;
1417        }
1418
1419        pub fn builder_port(&self) -> Option<u16> {
1420            self.builder_port
1421        }
1422
1423        pub fn signer(&self) -> LocalSigner<SigningKey> {
1424            self.signer.clone()
1425        }
1426
1427        pub fn l1_url(&self) -> Url {
1428            self.l1_url.clone()
1429        }
1430
1431        pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1432            self.anvil_provider.as_ref()
1433        }
1434
1435        pub fn get_upgrade_map(&self) -> UpgradeMap {
1436            self.upgrades.clone().into()
1437        }
1438
1439        pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1440            self.upgrades.clone()
1441        }
1442
1443        pub fn staking_priv_keys(&self) -> Vec<StakingKeySet> {
1444            staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1445        }
1446
1447        pub fn validator_providers(
1448            &self,
1449        ) -> Vec<(Address, impl Provider + Clone + use<NUM_NODES>)> {
1450            self.staking_priv_keys()
1451                .into_iter()
1452                .map(|key_set| {
1453                    (
1454                        key_set.signer.address(),
1455                        ProviderBuilder::new()
1456                            .wallet(EthereumWallet::from(key_set.signer))
1457                            .connect_http(self.l1_url.clone()),
1458                    )
1459                })
1460                .collect()
1461        }
1462
1463        pub async fn init_nodes(
1464            &self,
1465            upgrade: versions::Upgrade,
1466        ) -> Vec<SequencerContext<network::Memory, NoStorage>> {
1467            join_all((0..self.num_nodes()).map(|i| async move {
1468                self.init_node(
1469                    i,
1470                    ValidatedState::default(),
1471                    no_storage::Options,
1472                    Some(NullStateCatchup::default()),
1473                    None,
1474                    &NoMetrics,
1475                    STAKE_TABLE_CAPACITY_FOR_TEST,
1476                    NullEventConsumer,
1477                    upgrade,
1478                    Default::default(),
1479                )
1480                .await
1481            }))
1482            .await
1483        }
1484
1485        pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1486            &self.config.known_nodes_with_stake
1487        }
1488
1489        #[allow(clippy::too_many_arguments)]
1490        pub async fn init_node<P: PersistenceOptions>(
1491            &self,
1492            i: usize,
1493            mut state: ValidatedState,
1494            mut persistence_opt: P,
1495            state_peers: Option<impl StateCatchup + 'static>,
1496            storage: Option<RequestResponseStorage>,
1497            metrics: &dyn Metrics,
1498            stake_table_capacity: usize,
1499            event_consumer: impl EventConsumer + 'static,
1500            upgrade: versions::Upgrade,
1501            upgrades: BTreeMap<Version, Upgrade>,
1502        ) -> SequencerContext<network::Memory, P::Persistence> {
1503            let config = self.config.clone();
1504            let my_peer_config = &config.known_nodes_with_stake[i];
1505            let is_da = config.known_da_nodes.contains(my_peer_config);
1506
1507            // Create our own (private, local) validator config
1508            let validator_config = ValidatorConfig {
1509                public_key: my_peer_config.stake_table_entry.stake_key,
1510                private_key: self.priv_keys[i].clone(),
1511                stake_value: my_peer_config.stake_table_entry.stake_amount,
1512                state_public_key: self.state_key_pairs[i].ver_key(),
1513                state_private_key: self.state_key_pairs[i].sign_key(),
1514                is_da,
1515                x25519_keypair: None,
1516                p2p_addr: None,
1517            };
1518
1519            let topics = if is_da {
1520                vec![Topic::Global, Topic::Da]
1521            } else {
1522                vec![Topic::Global]
1523            };
1524
1525            let network = Arc::new(MemoryNetwork::new(
1526                &my_peer_config.stake_table_entry.stake_key,
1527                &self.master_map,
1528                &topics,
1529                None,
1530            ));
1531
1532            // Make sure the builder account is funded.
1533            let builder_account = Self::builder_key().fee_account();
1534            tracing::info!(%builder_account, "prefunding builder account");
1535            state.prefund_account(builder_account, U256::MAX.into());
1536
1537            let persistence = persistence_opt.create().await.unwrap();
1538
1539            let chain_config = state.chain_config.resolve().unwrap_or_default();
1540
1541            // Create an empty list of catchup providers
1542            let catchup_providers = ParallelStateCatchup::new(&[], Duration::from_secs(5));
1543
1544            // If we have the state peers, add them
1545            if let Some(state_peers) = state_peers {
1546                catchup_providers.add_provider(Arc::new(state_peers));
1547            }
1548
1549            // If we have a working local catchup provider, add it
1550            match persistence
1551                .clone()
1552                .into_catchup_provider(BackoffParams::default())
1553            {
1554                Ok(local_catchup) => {
1555                    catchup_providers.add_provider(local_catchup);
1556                },
1557                Err(e) => {
1558                    tracing::warn!(
1559                        "Failed to create local catchup provider: {e:#}. Only using remote \
1560                         catchup."
1561                    );
1562                },
1563            };
1564
1565            let l1_client = self
1566                .l1_opt
1567                .clone()
1568                .connect(vec![self.l1_url.clone()])
1569                .expect("failed to create L1 client");
1570            l1_client.spawn_tasks().await;
1571
1572            let fetcher = Fetcher::new(
1573                Arc::new(catchup_providers.clone()),
1574                Arc::new(Mutex::new(persistence.clone())),
1575                l1_client.clone(),
1576                chain_config,
1577            );
1578            fetcher.spawn_update_loop().await;
1579
1580            let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1581            let mut membership = EpochCommittees::new_stake(
1582                config.known_nodes_with_stake.clone(),
1583                config.known_da_nodes.clone(),
1584                block_reward,
1585                fetcher,
1586                config.epoch_height,
1587            );
1588            membership.reload_stake(50).await;
1589
1590            let membership = Arc::new(membership);
1591            let persistence = Arc::new(persistence);
1592
1593            let coordinator = EpochMembershipCoordinator::new(
1594                membership,
1595                config.epoch_height,
1596                &persistence.clone(),
1597            );
1598
1599            let node_state = NodeState::new(
1600                i as u64,
1601                chain_config,
1602                l1_client,
1603                Arc::new(catchup_providers.clone()),
1604                upgrade.base,
1605                coordinator.clone(),
1606                upgrade.base,
1607            )
1608            .with_current_version(upgrade.base)
1609            .with_genesis(state)
1610            .with_epoch_height(config.epoch_height)
1611            .with_upgrades(upgrades)
1612            .with_epoch_start_block(config.epoch_start_block);
1613
1614            tracing::info!(
1615                i,
1616                key = %my_peer_config.stake_table_entry.stake_key,
1617                state_key = %my_peer_config.state_ver_key,
1618                "starting node",
1619            );
1620
1621            let coordinator_network = {
1622                let keypair = x25519::Keypair::derive_from::<PubKey>(&self.priv_keys[i])
1623                    .expect("keypair derivation should succeed");
1624                let port = test_utils::reserve_tcp_port()
1625                    .expect("OS should have ephemeral ports available");
1626                let addr = NetAddr::Inet(Ipv4Addr::LOCALHOST.into(), port);
1627                let lock = UpgradeLock::<SeqTypes>::new(upgrade);
1628                Cliquenet::create(
1629                    "test-coordinator",
1630                    my_peer_config.stake_table_entry.stake_key,
1631                    keypair,
1632                    addr,
1633                    [],
1634                    lock,
1635                    Box::new(NoMetrics),
1636                )
1637                .await
1638                .expect("cliquenet creation should succeed")
1639            };
1640
1641            SequencerContext::init(
1642                NetworkConfig {
1643                    config,
1644                    // For testing, we use a fake network, so the rest of the network config beyond
1645                    // the base consensus config does not matter.
1646                    ..Default::default()
1647                },
1648                upgrade,
1649                validator_config,
1650                coordinator,
1651                node_state,
1652                storage,
1653                catchup_providers,
1654                persistence,
1655                network,
1656                coordinator_network,
1657                self.state_relay_url.clone(),
1658                metrics,
1659                stake_table_capacity,
1660                event_consumer,
1661                Default::default(),
1662                Duration::from_secs(2),
1663                NonZeroU64::new(100).unwrap(),
1664            )
1665            .await
1666            .unwrap()
1667        }
1668
1669        pub fn builder_key() -> EthKeyPair {
1670            FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1671        }
1672    }
1673
1674    // Wait for decide event, make sure it matches submitted transaction. Return the block number
1675    // containing the transaction and the block payload size
1676    pub async fn wait_for_decide_on_handle(
1677        events: &mut (impl Stream<Item = CoordinatorEvent<SeqTypes>> + Unpin),
1678        submitted_txn: &Transaction,
1679    ) -> (u64, usize) {
1680        let commitment = submitted_txn.commit();
1681
1682        // Keep getting events until we see a Decide event
1683        loop {
1684            let event = events.next().await.unwrap();
1685            tracing::info!("Received event from handle: {event:?}");
1686
1687            if let CoordinatorEvent::LegacyEvent(Event {
1688                event: EventType::Decide { leaf_chain, .. },
1689                ..
1690            }) = event
1691            {
1692                if let Some((height, size)) =
1693                    leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1694                        if leaf
1695                            .block_payload()
1696                            .as_ref()?
1697                            .transaction_commitments(leaf.block_header().metadata())
1698                            .contains(&commitment)
1699                        {
1700                            let size = leaf.block_payload().unwrap().encode().len();
1701                            Some((leaf.block_header().block_number(), size))
1702                        } else {
1703                            None
1704                        }
1705                    })
1706                {
1707                    tracing::info!(height, "transaction {commitment} sequenced");
1708                    return (height, size);
1709                }
1710            } else {
1711                // Keep waiting
1712            }
1713        }
1714    }
1715
1716    /// Waits until a node has reached the given target epoch (exclusive).
1717    /// The function returns once the first event indicates an epoch higher than `target_epoch`.
1718    pub async fn wait_for_epochs(
1719        events: &mut (impl futures::Stream<Item = CoordinatorEvent<SeqTypes>> + std::marker::Unpin),
1720        epoch_height: u64,
1721        target_epoch: u64,
1722    ) {
1723        tracing::info!(target_epoch, "waiting for epoch");
1724        while let Some(event) = events.next().await {
1725            if let CoordinatorEvent::LegacyEvent(Event {
1726                event: EventType::Decide { leaf_chain, .. },
1727                ..
1728            }) = event
1729            {
1730                let leaf = leaf_chain[0].leaf.clone();
1731                let epoch = leaf.epoch(epoch_height);
1732                tracing::debug!(
1733                    "Node decided at height: {}, epoch: {epoch:?}",
1734                    leaf.height(),
1735                );
1736
1737                if epoch > Some(EpochNumber::new(target_epoch)) {
1738                    break;
1739                }
1740            }
1741        }
1742        tracing::info!(target_epoch, "epoch started");
1743    }
1744}
1745
1746#[cfg(test)]
1747mod test {
1748    use alloy::node_bindings::Anvil;
1749    use espresso_types::{Header, MOCK_SEQUENCER_VERSIONS, NamespaceId, Payload, Transaction};
1750    use futures::StreamExt;
1751    use hotshot::types::{Event, EventType};
1752    use hotshot_example_types::node_types::TEST_VERSIONS;
1753    use hotshot_types::{
1754        event::LeafInfo,
1755        new_protocol::CoordinatorEvent,
1756        traits::block_contents::{BlockHeader, BlockPayload},
1757    };
1758    use testing::{TestConfigBuilder, wait_for_decide_on_handle};
1759
1760    use self::testing::run_test_builder;
1761    use super::*;
1762
1763    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1764    async fn test_skeleton_instantiation() {
1765        // Assign `config` so it isn't dropped early.
1766        let anvil = Anvil::new().spawn();
1767        let url = anvil.endpoint_url();
1768        const NUM_NODES: usize = 5;
1769        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1770            .l1_url(url)
1771            .build();
1772
1773        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1774
1775        config.set_builder_urls(vec1::vec1![builder_url]);
1776
1777        let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1778
1779        let handle_0 = &handles[0];
1780
1781        // Hook the builder up to the event stream from the first node
1782        builder_task.start(Box::new(
1783            handle_0
1784                .consensus_handle()
1785                .legacy_consensus()
1786                .read()
1787                .await
1788                .event_stream(),
1789        ));
1790
1791        let mut events = handle_0.event_stream();
1792
1793        for handle in handles.iter() {
1794            handle.start_consensus().await;
1795        }
1796
1797        // Submit target transaction to handle
1798        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1799        handles[0]
1800            .submit_transaction(txn.clone())
1801            .await
1802            .expect("Failed to submit transaction");
1803        tracing::info!("Submitted transaction to handle: {txn:?}");
1804
1805        wait_for_decide_on_handle(&mut events, &txn).await;
1806    }
1807
1808    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1809    async fn test_header_invariants() {
1810        let success_height = 30;
1811        // Assign `config` so it isn't dropped early.
1812        let anvil = Anvil::new().spawn();
1813        let url = anvil.endpoint_url();
1814        const NUM_NODES: usize = 5;
1815        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1816            .l1_url(url)
1817            .build();
1818
1819        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1820
1821        config.set_builder_urls(vec1::vec1![builder_url]);
1822        let handles = config.init_nodes(MOCK_SEQUENCER_VERSIONS).await;
1823
1824        let handle_0 = &handles[0];
1825
1826        let mut events = handle_0.event_stream();
1827
1828        // Hook the builder up to the event stream from the first node
1829        builder_task.start(Box::new(
1830            handle_0
1831                .consensus_handle()
1832                .legacy_consensus()
1833                .read()
1834                .await
1835                .event_stream(),
1836        ));
1837
1838        for handle in handles.iter() {
1839            handle.start_consensus().await;
1840        }
1841
1842        let mut parent = {
1843            // TODO refactor repeated code from other tests
1844            let (genesis_payload, genesis_ns_table) =
1845                Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1846                    .await
1847                    .unwrap();
1848
1849            let genesis_state = NodeState::mock();
1850            Header::genesis(
1851                &genesis_state,
1852                genesis_payload,
1853                &genesis_ns_table,
1854                TEST_VERSIONS.test.base,
1855            )
1856        };
1857
1858        loop {
1859            let event = events.next().await.unwrap();
1860            tracing::info!("Received event from handle: {event:?}");
1861            let CoordinatorEvent::LegacyEvent(Event {
1862                event: EventType::Decide { leaf_chain, .. },
1863                ..
1864            }) = event
1865            else {
1866                continue;
1867            };
1868            tracing::info!("Got decide {leaf_chain:?}");
1869
1870            // Check that each successive header satisfies invariants relative to its parent: all
1871            // the fields which should be monotonic are.
1872            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1873                let header = leaf.block_header().clone();
1874                if header.height() == 0 {
1875                    parent = header;
1876                    continue;
1877                }
1878                assert_eq!(header.height(), parent.height() + 1);
1879                assert!(header.timestamp() >= parent.timestamp());
1880                assert!(header.l1_head() >= parent.l1_head());
1881                assert!(header.l1_finalized() >= parent.l1_finalized());
1882                parent = header;
1883            }
1884
1885            if parent.height() >= success_height {
1886                break;
1887            }
1888        }
1889    }
1890}