espresso_node/state_signature/relay_server/
stake_table_tracker.rs1use std::{
2 collections::{BTreeSet, HashMap},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_lock::RwLock;
8use espresso_contract_deployer::network_config::{
9 fetch_epoch_config_from_sequencer, fetch_stake_table_from_sequencer,
10};
11use hotshot_types::{
12 data::EpochNumber, light_client::StateVerKey, stake_table::one_honest_threshold,
13 traits::signature_key::StakeTableEntryType, utils::epoch_from_block_number,
14};
15use url::Url;
16
17#[derive(Clone, Debug, Default)]
19pub struct StakeTableInfo {
20 pub threshold: U256,
22 pub known_nodes: HashMap<StateVerKey, U256>,
24}
25
26pub struct StakeTableTrackerInner {
28 sequencer_url: Url,
30
31 blocks_per_epoch: Option<u64>,
33
34 epoch_start_block: Option<u64>,
36
37 stake_table_infos: HashMap<u64, Arc<StakeTableInfo>>,
39
40 genesis_stake_table_info: Option<Arc<StakeTableInfo>>,
42
43 gc_queue: BTreeSet<u64>,
45}
46
47const PRUNE_GAP: u64 = 2;
49
50pub struct StakeTableTracker {
52 inner: Arc<RwLock<StakeTableTrackerInner>>,
53}
54
55impl StakeTableTracker {
56 pub fn new(sequencer_url: Url) -> Self {
57 Self {
58 inner: Arc::new(RwLock::new(StakeTableTrackerInner {
59 sequencer_url,
60 blocks_per_epoch: None,
61 epoch_start_block: None,
62 stake_table_infos: HashMap::new(),
63 genesis_stake_table_info: None,
64 gc_queue: BTreeSet::new(),
65 })),
66 }
67 }
68
69 pub async fn genesis_stake_table_info(&self) -> anyhow::Result<Arc<StakeTableInfo>> {
71 tracing::trace!("Acquire read lock for genesis stake table info");
72 let read_guard = self.inner.read().await;
73 if let Some(stake_table_info) = &read_guard.genesis_stake_table_info {
74 return Ok(stake_table_info.clone());
75 }
76 tracing::trace!("Drop read lock for genesis stake table info");
77 drop(read_guard);
78 tracing::trace!("Acquire write lock for genesis stake table info");
79 let mut write_guard = self.inner.write().await;
80
81 if let Some(stake_table_info) = &write_guard.genesis_stake_table_info {
82 return Ok(stake_table_info.clone());
83 }
84
85 let genesis_stake_table =
86 fetch_stake_table_from_sequencer(&write_guard.sequencer_url, None).await?;
87 let genesis_total_stake = genesis_stake_table.total_stakes();
88
89 tracing::debug!("Fetching genesis stake table from sequencer");
90 let genesis_stake_table_info = Arc::new(StakeTableInfo {
91 threshold: one_honest_threshold(genesis_total_stake),
92 known_nodes: genesis_stake_table
93 .into_iter()
94 .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
95 .collect(),
96 });
97 tracing::debug!("Genesis stake table info updated");
98
99 write_guard.genesis_stake_table_info = Some(genesis_stake_table_info.clone());
100 tracing::trace!("Drop write lock for genesis stake table info");
101
102 Ok(genesis_stake_table_info)
103 }
104
105 pub async fn stake_table_info_for_block(
108 &self,
109 block_height: u64,
110 ) -> anyhow::Result<Arc<StakeTableInfo>> {
111 tracing::debug!("Fetch stake table for block {block_height}");
112
113 tracing::trace!("Acquire read lock for stake table info");
114 let read_guard = self.inner.read().await;
115 let (blocks_per_epoch, epoch_start_block) =
116 if let Some(blocks_per_epoch) = read_guard.blocks_per_epoch {
117 let epoch_start_block = read_guard.epoch_start_block.unwrap();
118 tracing::trace!("Drop read lock for stake table info");
119 drop(read_guard);
120 (blocks_per_epoch, epoch_start_block)
121 } else {
122 tracing::trace!("Drop read lock for stake table info");
123 drop(read_guard);
124 tracing::trace!("Acquire write lock for stake table info");
125 let mut write_guard = self.inner.write().await;
126 if let Some(blocks_per_epoch) = write_guard.blocks_per_epoch {
127 (blocks_per_epoch, write_guard.epoch_start_block.unwrap())
128 } else {
129 tracing::debug!("Fetching epoch config from sequencer");
130 let (blocks_per_epoch, epoch_start_block) =
131 fetch_epoch_config_from_sequencer(&write_guard.sequencer_url).await?;
132 write_guard.blocks_per_epoch.get_or_insert(blocks_per_epoch);
133 write_guard
134 .epoch_start_block
135 .get_or_insert(epoch_start_block);
136 tracing::debug!(
137 "Fetched epoch config from sequencer: blocks_per_epoch: {}, \
138 epoch_start_block: {}",
139 blocks_per_epoch,
140 epoch_start_block
141 );
142 tracing::trace!("Drop write lock for stake table info");
143 drop(write_guard);
144 (blocks_per_epoch, epoch_start_block)
145 }
146 };
147 if block_height <= epoch_start_block || blocks_per_epoch == 0 {
148 return self.genesis_stake_table_info().await;
149 }
150
151 let epoch = epoch_from_block_number(block_height, blocks_per_epoch);
152 tracing::trace!("Acquire read lock for stake table info");
153 let read_guard = self.inner.read().await;
154 if let Some(stake_table_info) = read_guard.stake_table_infos.get(&epoch) {
155 return Ok(stake_table_info.clone());
156 }
157 tracing::trace!("Drop read lock for stake table info");
158 drop(read_guard);
159 tracing::trace!("Acquire write lock for stake table info");
160 let mut write_guard = self.inner.write().await;
161 if let Some(stake_table_info) = write_guard.stake_table_infos.get(&epoch) {
162 return Ok(stake_table_info.clone());
163 }
164
165 tracing::debug!("Fetching stake table for epoch {} from sequencer", epoch);
166 let stake_table = fetch_stake_table_from_sequencer(
167 &write_guard.sequencer_url,
168 Some(EpochNumber::new(epoch)),
169 )
170 .await?;
171 let total_stake = stake_table.total_stakes();
172
173 let stake_table_info = Arc::new(StakeTableInfo {
174 threshold: one_honest_threshold(total_stake),
175 known_nodes: stake_table
176 .into_iter()
177 .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
178 .collect(),
179 });
180
181 write_guard
182 .stake_table_infos
183 .insert(epoch, stake_table_info.clone());
184 write_guard.gc_queue.insert(epoch);
185 tracing::debug!("Stake table info for epoch {} updated", epoch);
186 while let Some(&old_epoch) = write_guard.gc_queue.first() {
188 if epoch < PRUNE_GAP || old_epoch >= epoch - PRUNE_GAP {
189 break;
190 }
191 write_guard.stake_table_infos.remove(&old_epoch);
192 write_guard.gc_queue.pop_first();
193 tracing::debug!(%old_epoch, "garbage collected for epoch");
194 }
195 tracing::trace!("Drop write lock for stake table info");
196
197 Ok(stake_table_info)
198 }
199}