espresso_node/state_signature/relay_server/
stake_table_tracker.rs

1use std::{
2    collections::{BTreeSet, HashMap},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_lock::RwLock;
8use espresso_contract_deployer::network_config::{
9    fetch_epoch_config_from_sequencer, fetch_stake_table_from_sequencer,
10};
11use hotshot_types::{
12    data::EpochNumber, light_client::StateVerKey, stake_table::one_honest_threshold,
13    traits::signature_key::StakeTableEntryType, utils::epoch_from_block_number,
14};
15use url::Url;
16
17/// Stake table info for a specific epoch
18#[derive(Clone, Debug, Default)]
19pub struct StakeTableInfo {
20    /// Minimum weight to form an available state signature bundle
21    pub threshold: U256,
22    /// Stake table: map(vk, weight)
23    pub known_nodes: HashMap<StateVerKey, U256>,
24}
25
26/// Tracks the stake table info for each epoch
27pub struct StakeTableTrackerInner {
28    /// Sequencer endpoint to query for stake table info
29    sequencer_url: Url,
30
31    /// Blocks per epoch, should be initialized from the sequencer
32    blocks_per_epoch: Option<u64>,
33
34    /// Epoch start block, should be initialized from the sequencer
35    epoch_start_block: Option<u64>,
36
37    /// Stake table info for each epoch
38    stake_table_infos: HashMap<u64, Arc<StakeTableInfo>>,
39
40    /// Genesis stake table info
41    genesis_stake_table_info: Option<Arc<StakeTableInfo>>,
42
43    /// Queue for garbage collection
44    gc_queue: BTreeSet<u64>,
45}
46
47/// Number of epochs to keep the stake table info
48const PRUNE_GAP: u64 = 2;
49
50/// Tracks the stake table info for each epoch
51pub struct StakeTableTracker {
52    inner: Arc<RwLock<StakeTableTrackerInner>>,
53}
54
55impl StakeTableTracker {
56    pub fn new(sequencer_url: Url) -> Self {
57        Self {
58            inner: Arc::new(RwLock::new(StakeTableTrackerInner {
59                sequencer_url,
60                blocks_per_epoch: None,
61                epoch_start_block: None,
62                stake_table_infos: HashMap::new(),
63                genesis_stake_table_info: None,
64                gc_queue: BTreeSet::new(),
65            })),
66        }
67    }
68
69    /// Return the genesis stake table info
70    pub async fn genesis_stake_table_info(&self) -> anyhow::Result<Arc<StakeTableInfo>> {
71        tracing::trace!("Acquire read lock for genesis stake table info");
72        let read_guard = self.inner.read().await;
73        if let Some(stake_table_info) = &read_guard.genesis_stake_table_info {
74            return Ok(stake_table_info.clone());
75        }
76        tracing::trace!("Drop read lock for genesis stake table info");
77        drop(read_guard);
78        tracing::trace!("Acquire write lock for genesis stake table info");
79        let mut write_guard = self.inner.write().await;
80
81        if let Some(stake_table_info) = &write_guard.genesis_stake_table_info {
82            return Ok(stake_table_info.clone());
83        }
84
85        let genesis_stake_table =
86            fetch_stake_table_from_sequencer(&write_guard.sequencer_url, None).await?;
87        let genesis_total_stake = genesis_stake_table.total_stakes();
88
89        tracing::debug!("Fetching genesis stake table from sequencer");
90        let genesis_stake_table_info = Arc::new(StakeTableInfo {
91            threshold: one_honest_threshold(genesis_total_stake),
92            known_nodes: genesis_stake_table
93                .into_iter()
94                .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
95                .collect(),
96        });
97        tracing::debug!("Genesis stake table info updated");
98
99        write_guard.genesis_stake_table_info = Some(genesis_stake_table_info.clone());
100        tracing::trace!("Drop write lock for genesis stake table info");
101
102        Ok(genesis_stake_table_info)
103    }
104
105    /// Return the stake table info for the given block height
106    /// If the block height is older than the epoch start block, return the genesis stake table info
107    pub async fn stake_table_info_for_block(
108        &self,
109        block_height: u64,
110    ) -> anyhow::Result<Arc<StakeTableInfo>> {
111        tracing::debug!("Fetch stake table for block {block_height}");
112
113        tracing::trace!("Acquire read lock for stake table info");
114        let read_guard = self.inner.read().await;
115        let (blocks_per_epoch, epoch_start_block) =
116            if let Some(blocks_per_epoch) = read_guard.blocks_per_epoch {
117                let epoch_start_block = read_guard.epoch_start_block.unwrap();
118                tracing::trace!("Drop read lock for stake table info");
119                drop(read_guard);
120                (blocks_per_epoch, epoch_start_block)
121            } else {
122                tracing::trace!("Drop read lock for stake table info");
123                drop(read_guard);
124                tracing::trace!("Acquire write lock for stake table info");
125                let mut write_guard = self.inner.write().await;
126                if let Some(blocks_per_epoch) = write_guard.blocks_per_epoch {
127                    (blocks_per_epoch, write_guard.epoch_start_block.unwrap())
128                } else {
129                    tracing::debug!("Fetching epoch config from sequencer");
130                    let (blocks_per_epoch, epoch_start_block) =
131                        fetch_epoch_config_from_sequencer(&write_guard.sequencer_url).await?;
132                    write_guard.blocks_per_epoch.get_or_insert(blocks_per_epoch);
133                    write_guard
134                        .epoch_start_block
135                        .get_or_insert(epoch_start_block);
136                    tracing::debug!(
137                        "Fetched epoch config from sequencer: blocks_per_epoch: {}, \
138                         epoch_start_block: {}",
139                        blocks_per_epoch,
140                        epoch_start_block
141                    );
142                    tracing::trace!("Drop write lock for stake table info");
143                    drop(write_guard);
144                    (blocks_per_epoch, epoch_start_block)
145                }
146            };
147        if block_height <= epoch_start_block || blocks_per_epoch == 0 {
148            return self.genesis_stake_table_info().await;
149        }
150
151        let epoch = epoch_from_block_number(block_height, blocks_per_epoch);
152        tracing::trace!("Acquire read lock for stake table info");
153        let read_guard = self.inner.read().await;
154        if let Some(stake_table_info) = read_guard.stake_table_infos.get(&epoch) {
155            return Ok(stake_table_info.clone());
156        }
157        tracing::trace!("Drop read lock for stake table info");
158        drop(read_guard);
159        tracing::trace!("Acquire write lock for stake table info");
160        let mut write_guard = self.inner.write().await;
161        if let Some(stake_table_info) = write_guard.stake_table_infos.get(&epoch) {
162            return Ok(stake_table_info.clone());
163        }
164
165        tracing::debug!("Fetching stake table for epoch {} from sequencer", epoch);
166        let stake_table = fetch_stake_table_from_sequencer(
167            &write_guard.sequencer_url,
168            Some(EpochNumber::new(epoch)),
169        )
170        .await?;
171        let total_stake = stake_table.total_stakes();
172
173        let stake_table_info = Arc::new(StakeTableInfo {
174            threshold: one_honest_threshold(total_stake),
175            known_nodes: stake_table
176                .into_iter()
177                .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
178                .collect(),
179        });
180
181        write_guard
182            .stake_table_infos
183            .insert(epoch, stake_table_info.clone());
184        write_guard.gc_queue.insert(epoch);
185        tracing::debug!("Stake table info for epoch {} updated", epoch);
186        // Remove the stake table info if it's older than 2 epochs
187        while let Some(&old_epoch) = write_guard.gc_queue.first() {
188            if epoch < PRUNE_GAP || old_epoch >= epoch - PRUNE_GAP {
189                break;
190            }
191            write_guard.stake_table_infos.remove(&old_epoch);
192            write_guard.gc_queue.pop_first();
193            tracing::debug!(%old_epoch, "garbage collected for epoch");
194        }
195        tracing::trace!("Drop write lock for stake table info");
196
197        Ok(stake_table_info)
198    }
199}