espresso_node/persistence/
fs.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    fs::{self, File, OpenOptions},
4    io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5    ops::RangeInclusive,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Instant,
9};
10
11use alloy::primitives::Address;
12use anyhow::{Context, anyhow, bail};
13use async_lock::RwLock;
14use async_trait::async_trait;
15use clap::Parser;
16use espresso_types::{
17    AuthenticatedValidatorMap, Leaf, Leaf2, NetworkConfig, Payload, PubKey, RegisteredValidatorMap,
18    SeqTypes, StakeTableHash,
19    traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
20    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
21    v0_3::{
22        AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
23        StakeTableEvent,
24    },
25};
26use hotshot::InitializerEpochInfo;
27use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
28    DhtPersistentStorage, SerializableRecord,
29};
30use hotshot_types::{
31    data::{
32        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
33        QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
34    },
35    drb::{DrbInput, DrbResult},
36    event::{Event, EventType, HotShotAction, LeafInfo},
37    message::{Proposal, convert_proposal},
38    simple_certificate::{
39        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
40        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
41    },
42    traits::{
43        block_contents::{BlockHeader, BlockPayload},
44        metrics::Metrics,
45        node_implementation::NodeType,
46    },
47    vote::HasViewNumber,
48};
49use itertools::Itertools;
50
51use super::RegisteredValidatorNoX25519;
52use crate::{
53    RECENT_STAKE_TABLES_LIMIT, ViewNumber,
54    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
55};
56
57/// Deserialize a stake table from bytes, trying current and legacy formats.
58/// Returns (stake_tuple, needs_rewrite) where needs_rewrite=true means legacy format was used.
59fn deserialize_stake_table(bytes: &[u8]) -> anyhow::Result<(StakeTuple, bool)> {
60    // Try current format (RegisteredValidator with x25519 fields).
61    if let Ok(stake) = bincode::deserialize::<StakeTuple>(bytes) {
62        return Ok((stake, false));
63    }
64
65    // Legacy: RegisteredValidator without x25519_key/p2p_addr.
66    type LegacyMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
67    type LegacyTuple = (LegacyMap, Option<RewardAmount>, Option<StakeTableHash>);
68    let legacy: LegacyTuple = bincode::deserialize(bytes)
69        .context("failed to deserialize stake table (tried current and legacy formats)")?;
70    let migrated: AuthenticatedValidatorMap = legacy
71        .0
72        .into_iter()
73        .map(|(addr, v)| {
74            let registered = v.migrate();
75            (
76                addr,
77                AuthenticatedValidator::try_from(registered)
78                    .expect("stake tables only contain authenticated validators"),
79            )
80        })
81        .collect();
82    Ok(((migrated, legacy.1, legacy.2), true))
83}
84
85/// Options for file system backed persistence.
86#[derive(Parser, Clone, Debug)]
87pub struct Options {
88    /// Storage path for persistent data.
89    #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
90    path: PathBuf,
91
92    /// Number of views to retain in consensus storage before data that hasn't been archived is
93    /// garbage collected.
94    ///
95    /// The longer this is, the more certain that all data will eventually be archived, even if
96    /// there are temporary problems with archive storage or partially missing data. This can be set
97    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
98    /// setting only applies to views which never get decided (ie forks in consensus) and views for
99    /// which this node is partially offline. These should be exceptionally rare.
100    ///
101    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
102    /// view time of 2s.
103    #[clap(
104        long,
105        env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
106        default_value = "130000"
107    )]
108    pub(crate) consensus_view_retention: u64,
109}
110
111impl Default for Options {
112    fn default() -> Self {
113        Self::parse_from(std::iter::empty::<String>())
114    }
115}
116
117impl Options {
118    pub fn new(path: PathBuf) -> Self {
119        Self {
120            path,
121            consensus_view_retention: 130000,
122        }
123    }
124
125    pub(crate) fn path(&self) -> &Path {
126        &self.path
127    }
128}
129
130#[async_trait]
131impl PersistenceOptions for Options {
132    type Persistence = Persistence;
133
134    fn set_view_retention(&mut self, view_retention: u64) {
135        self.consensus_view_retention = view_retention;
136    }
137
138    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
139        let path = self.path.clone();
140        let view_retention = self.consensus_view_retention;
141
142        let migration_path = path.join("migration");
143        let migrated = if migration_path.is_file() {
144            let bytes = fs::read(&migration_path).context(format!(
145                "unable to read migration from {}",
146                migration_path.display()
147            ))?;
148            bincode::deserialize(&bytes).context("malformed migration file")?
149        } else {
150            HashSet::new()
151        };
152
153        Ok(Persistence {
154            inner: Arc::new(RwLock::new(Inner {
155                path,
156                migrated,
157                view_retention,
158            })),
159            metrics: Arc::new(PersistenceMetricsValue::default()),
160        })
161    }
162
163    async fn reset(self) -> anyhow::Result<()> {
164        todo!()
165    }
166}
167
168/// File system backed persistence.
169#[derive(Clone, Debug)]
170pub struct Persistence {
171    // We enforce mutual exclusion on access to the data source, as the current file system
172    // implementation does not support transaction isolation for concurrent reads and writes. We can
173    // improve this in the future by switching to a SQLite-based file system implementation.
174    inner: Arc<RwLock<Inner>>,
175    /// A reference to the metrics trait
176    metrics: Arc<PersistenceMetricsValue>,
177}
178
179#[derive(Debug)]
180struct Inner {
181    path: PathBuf,
182    view_retention: u64,
183    migrated: HashSet<String>,
184}
185
186impl Inner {
187    fn config_path(&self) -> PathBuf {
188        self.path.join("hotshot.cfg")
189    }
190
191    fn migration(&self) -> PathBuf {
192        self.path.join("migration")
193    }
194
195    fn voted_view_path(&self) -> PathBuf {
196        self.path.join("highest_voted_view")
197    }
198
199    fn restart_view_path(&self) -> PathBuf {
200        self.path.join("restart_view")
201    }
202
203    /// Path to a directory containing decided leaves.
204    fn decided_leaf_path(&self) -> PathBuf {
205        self.path.join("decided_leaves")
206    }
207
208    fn decided_leaf2_path(&self) -> PathBuf {
209        self.path.join("decided_leaves2")
210    }
211
212    /// The path from previous versions where there was only a single file for anchor leaves.
213    fn legacy_anchor_leaf_path(&self) -> PathBuf {
214        self.path.join("anchor_leaf")
215    }
216
217    fn vid_dir_path(&self) -> PathBuf {
218        self.path.join("vid")
219    }
220
221    fn vid2_dir_path(&self) -> PathBuf {
222        self.path.join("vid2")
223    }
224
225    fn da_dir_path(&self) -> PathBuf {
226        self.path.join("da")
227    }
228
229    fn drb_dir_path(&self) -> PathBuf {
230        self.path.join("drb")
231    }
232
233    fn da2_dir_path(&self) -> PathBuf {
234        self.path.join("da2")
235    }
236
237    fn quorum_proposals_dir_path(&self) -> PathBuf {
238        self.path.join("quorum_proposals")
239    }
240
241    fn quorum_proposals2_dir_path(&self) -> PathBuf {
242        self.path.join("quorum_proposals2")
243    }
244
245    fn upgrade_certificate_dir_path(&self) -> PathBuf {
246        self.path.join("upgrade_certificate")
247    }
248
249    fn stake_table_dir_path(&self) -> PathBuf {
250        self.path.join("stake_table")
251    }
252
253    fn next_epoch_qc(&self) -> PathBuf {
254        self.path.join("next_epoch_quorum_certificate")
255    }
256
257    fn eqc(&self) -> PathBuf {
258        self.path.join("eqc")
259    }
260
261    fn libp2p_dht_path(&self) -> PathBuf {
262        self.path.join("libp2p_dht")
263    }
264    fn epoch_drb_result_dir_path(&self) -> PathBuf {
265        self.path.join("epoch_drb_result")
266    }
267
268    fn epoch_root_block_header_dir_path(&self) -> PathBuf {
269        self.path.join("epoch_root_block_header")
270    }
271
272    fn finalized_state_cert_dir_path(&self) -> PathBuf {
273        self.path.join("finalized_state_cert")
274    }
275
276    fn state_cert_dir_path(&self) -> PathBuf {
277        self.path.join("state_cert")
278    }
279
280    fn update_migration(&mut self) -> anyhow::Result<()> {
281        let path = self.migration();
282        let bytes = bincode::serialize(&self.migrated)?;
283
284        self.replace(
285            &path,
286            |_| Ok(true),
287            |mut file| {
288                file.write_all(&bytes)?;
289                Ok(())
290            },
291        )
292    }
293
294    /// Overwrite a file if a condition is met.
295    ///
296    /// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
297    /// returns `true`, or if there was no existing file, then `write` is called to update the
298    /// contents of the file. `write` receives a truncated file open in write mode and sets the
299    /// contents of the file.
300    ///
301    /// The final replacement of the original file is atomic; that is, `path` will be modified only
302    /// if the entire update succeeds.
303    fn replace(
304        &mut self,
305        path: &Path,
306        pred: impl FnOnce(File) -> anyhow::Result<bool>,
307        write: impl FnOnce(File) -> anyhow::Result<()>,
308    ) -> anyhow::Result<()> {
309        if path.is_file() {
310            // If there is an existing file, check if it is suitable to replace. Note that this
311            // check is not atomic with respect to the subsequent write at the file system level,
312            // but this object is the only one which writes to this file, and we have a mutable
313            // reference, so this should be safe.
314            if !pred(File::open(path)?)? {
315                // If we are not overwriting the file, we are done and consider the whole operation
316                // successful.
317                return Ok(());
318            }
319        }
320
321        // Either there is no existing file or we have decided to overwrite the file. Write the new
322        // contents into a temporary file so we can update `path` atomically using `rename`.
323        let mut swap_path = path.to_owned();
324        swap_path.set_extension("swp");
325        let swap = OpenOptions::new()
326            .write(true)
327            .truncate(true)
328            .create(true)
329            .open(&swap_path)?;
330        write(swap)?;
331
332        // Now we can replace the original file.
333        fs::rename(swap_path, path)?;
334
335        Ok(())
336    }
337
338    fn collect_garbage(
339        &mut self,
340        decided_view: ViewNumber,
341        prune_intervals: &[RangeInclusive<ViewNumber>],
342    ) -> anyhow::Result<()> {
343        let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
344
345        self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
346        self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
347        self.prune_files(
348            self.quorum_proposals2_dir_path(),
349            prune_view,
350            None,
351            prune_intervals,
352        )?;
353        self.prune_files(
354            self.state_cert_dir_path(),
355            prune_view,
356            None,
357            prune_intervals,
358        )?;
359
360        // Save the most recent leaf as it will be our anchor point if the node restarts.
361        self.prune_files(
362            self.decided_leaf2_path(),
363            prune_view,
364            Some(decided_view),
365            prune_intervals,
366        )?;
367
368        Ok(())
369    }
370
371    fn prune_files(
372        &mut self,
373        dir_path: PathBuf,
374        prune_view: ViewNumber,
375        keep_decided_view: Option<ViewNumber>,
376        prune_intervals: &[RangeInclusive<ViewNumber>],
377    ) -> anyhow::Result<()> {
378        if !dir_path.is_dir() {
379            return Ok(());
380        }
381
382        for (file_view, path) in view_files(dir_path)? {
383            // If the view is the anchor view, keep it no matter what.
384            if let Some(decided_view) = keep_decided_view
385                && decided_view == file_view
386            {
387                continue;
388            }
389            // Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
390            // which we've already successfully processed, contain the view; in this case we simply
391            // don't need it anymore.
392            if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
393                fs::remove_file(&path)?;
394            }
395        }
396
397        Ok(())
398    }
399
400    fn parse_decided_leaf(
401        &self,
402        bytes: &[u8],
403    ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
404        // Old versions of the software did not store the next epoch QC. Without knowing which
405        // version this file was created with, we can simply try parsing both ways and then
406        // reconstruct a certificate pair with or without the next epoch QC.
407        match bincode::deserialize(bytes) {
408            Ok((leaf, cert)) => Ok((leaf, cert)),
409            Err(err) => {
410                tracing::info!(
411                    "error parsing decided leaf, maybe file was created without next epoch QC? \
412                     {err}"
413                );
414                let (leaf, qc) =
415                    bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
416                        .context("parsing decided leaf")?;
417                Ok((leaf, CertificatePair::non_epoch_change(qc)))
418            },
419        }
420    }
421
422    /// Generate events based on persisted decided leaves.
423    ///
424    /// Returns a list of closed intervals of views which can be safely deleted, as all leaves
425    /// within these view ranges have been processed by the event consumer.
426    async fn generate_decide_events(
427        &mut self,
428        view: ViewNumber,
429        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
430        consumer: &impl EventConsumer,
431    ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
432        // Generate a decide event for each leaf, to be processed by the event consumer. We make a
433        // separate event for each leaf because it is possible we have non-consecutive leaves in our
434        // storage, which would not be valid as a single decide with a single leaf chain.
435        let mut leaves = BTreeMap::new();
436        for (v, path) in view_files(self.decided_leaf2_path())? {
437            if v > view {
438                continue;
439            }
440
441            let bytes =
442                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
443            let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
444
445            // Include the VID share if available.
446            let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
447            if vid_share.is_none() {
448                tracing::debug!(?v, "VID share not available at decide");
449            }
450
451            // Move the state cert to the finalized dir if it exists.
452            let state_cert = self.store_finalized_state_cert(v)?;
453
454            // Fill in the full block payload using the DA proposals we had persisted.
455            if let Some(proposal) = self.load_da_proposal(v)? {
456                let payload = Payload::from_bytes(
457                    &proposal.data.encoded_transactions,
458                    &proposal.data.metadata,
459                );
460                leaf.fill_block_payload_unchecked(payload);
461            } else {
462                tracing::debug!(?v, "DA proposal not available at decide");
463            }
464
465            let info = LeafInfo {
466                leaf,
467                vid_share,
468                state_cert,
469                // Note: the following fields are not used in Decide event processing, and should be
470                // removed. For now, we just default them.
471                state: Default::default(),
472                delta: Default::default(),
473            };
474
475            leaves.insert(v, (info, cert));
476        }
477
478        // The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is
479        // one -- was always included in the _previous_ decide event...but not removed from the
480        // database, because we always persist the most recent anchor leaf.
481        if let Some((oldest_view, _)) = leaves.first_key_value() {
482            // The only exception is when the oldest leaf is the genesis leaf; then there was no
483            // previous decide event.
484            if *oldest_view > ViewNumber::genesis() {
485                leaves.pop_first();
486            }
487        }
488
489        let mut intervals = vec![];
490        let mut current_interval = None;
491        for (view, (leaf, cert)) in leaves {
492            let height = leaf.leaf.block_header().block_number();
493
494            {
495                // Insert the deciding QC at the appropriate position, with the last decide event in the
496                // chain.
497                let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
498                    (deciding_qc.view_number() == cert.view_number() + 1)
499                        .then_some(deciding_qc.clone())
500                } else {
501                    None
502                };
503
504                consumer
505                    .handle_event(&Event {
506                        view_number: view,
507                        event: EventType::Decide {
508                            committing_qc: Arc::new(cert),
509                            deciding_qc,
510                            leaf_chain: Arc::new(vec![leaf]),
511                            block_size: None,
512                        },
513                    })
514                    .await?;
515            }
516            if let Some((start, end, current_height)) = current_interval.as_mut() {
517                if height == *current_height + 1 {
518                    // If we have a chain of consecutive leaves, extend the current interval of
519                    // views which are safe to delete.
520                    *current_height += 1;
521                    *end = view;
522                } else {
523                    // Otherwise, end the current interval and start a new one.
524                    intervals.push(*start..=*end);
525                    current_interval = Some((view, view, height));
526                }
527            } else {
528                // Start a new interval.
529                current_interval = Some((view, view, height));
530            }
531        }
532        if let Some((start, end, _)) = current_interval {
533            intervals.push(start..=end);
534        }
535
536        Ok(intervals)
537    }
538
539    fn load_da_proposal(
540        &self,
541        view: ViewNumber,
542    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
543        let dir_path = self.da2_dir_path();
544
545        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
546
547        if !file_path.exists() {
548            return Ok(None);
549        }
550
551        let da_bytes = fs::read(file_path)?;
552
553        let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
554            bincode::deserialize(&da_bytes)?;
555        Ok(Some(da_proposal))
556    }
557
558    fn load_vid_share(
559        &self,
560        view: ViewNumber,
561    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
562        let dir_path = self.vid2_dir_path();
563
564        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
565
566        if !file_path.exists() {
567            return Ok(None);
568        }
569
570        let vid_share_bytes = fs::read(file_path)?;
571        let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
572            bincode::deserialize(&vid_share_bytes)?;
573        Ok(Some(vid_share))
574    }
575
576    fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
577        tracing::info!("Checking `Leaf2` to load the anchor leaf.");
578        if self.decided_leaf2_path().is_dir() {
579            let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
580
581            // Return the latest decided leaf.
582            for (_, path) in view_files(self.decided_leaf2_path())? {
583                let bytes =
584                    fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
585                let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
586                if let Some((anchor_leaf, _)) = &anchor {
587                    if leaf.view_number() > anchor_leaf.view_number() {
588                        anchor = Some((leaf, cert.qc().clone()));
589                    }
590                } else {
591                    anchor = Some((leaf, cert.qc().clone()));
592                }
593            }
594
595            return Ok(anchor);
596        }
597
598        tracing::warn!(
599            "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
600             This is very likely to fail."
601        );
602        if self.legacy_anchor_leaf_path().is_file() {
603            // We may have an old version of storage, where there is just a single file for the
604            // anchor leaf. Read it and return the contents.
605            let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
606
607            // The first 8 bytes just contain the height of the leaf. We can skip this.
608            file.seek(SeekFrom::Start(8)).context("seek")?;
609            let bytes = file
610                .bytes()
611                .collect::<Result<Vec<_>, _>>()
612                .context("read")?;
613            return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
614        }
615
616        Ok(None)
617    }
618
619    fn store_finalized_state_cert(
620        &mut self,
621        view: ViewNumber,
622    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
623        let dir_path = self.state_cert_dir_path();
624        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
625
626        if !file_path.exists() {
627            return Ok(None);
628        }
629
630        let bytes = fs::read(&file_path)?;
631
632        let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
633            bincode::deserialize(&bytes).or_else(|err_v2| {
634                tracing::info!(
635                    error = %err_v2,
636                    path = %file_path.display(),
637                    "Failed to deserialize state certificate, attempting with v1"
638                );
639
640                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
641                    .map(Into::into)
642                    .with_context(|| {
643                        format!(
644                            "Failed to deserialize with both v2 and v1 from file '{}'. error: \
645                             {err_v2}",
646                            file_path.display()
647                        )
648                    })
649            })?;
650
651        let epoch = state_cert.epoch.u64();
652        let finalized_dir_path = self.finalized_state_cert_dir_path();
653        fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
654
655        let finalized_file_path = finalized_dir_path
656            .join(epoch.to_string())
657            .with_extension("txt");
658
659        self.replace(
660            &finalized_file_path,
661            |_| Ok(true),
662            |mut file| {
663                file.write_all(&bytes)?;
664                Ok(())
665            },
666        )
667        .context(format!(
668            "finalizing light client state update certificate file for epoch {epoch:?}"
669        ))?;
670
671        Ok(Some(state_cert))
672    }
673}
674
675#[async_trait]
676impl SequencerPersistence for Persistence {
677    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
678        Ok(())
679    }
680
681    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
682        let inner = self.inner.read().await;
683        let path = inner.config_path();
684        if !path.is_file() {
685            tracing::info!("config not found at {}", path.display());
686            return Ok(None);
687        }
688        tracing::info!("loading config from {}", path.display());
689
690        let bytes =
691            fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
692        let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
693        let json = migrate_network_config(json).context("migration of network config failed")?;
694        let config = serde_json::from_value(json).context("malformed config file")?;
695        Ok(Some(config))
696    }
697
698    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
699        let inner = self.inner.write().await;
700        let path = inner.config_path();
701        tracing::info!("saving config to {}", path.display());
702        Ok(cfg.to_file(path.display().to_string())?)
703    }
704
705    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
706        let inner = self.inner.read().await;
707        let path = inner.voted_view_path();
708        if !path.is_file() {
709            return Ok(None);
710        }
711        let bytes = fs::read(inner.voted_view_path())?
712            .try_into()
713            .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
714        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
715    }
716
717    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
718        let inner = self.inner.read().await;
719        let path = inner.restart_view_path();
720        if !path.is_file() {
721            return Ok(None);
722        }
723        let bytes = fs::read(path)?
724            .try_into()
725            .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
726        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
727    }
728
729    async fn append_decided_leaves(
730        &self,
731        view: ViewNumber,
732        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
733        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
734        consumer: &impl EventConsumer,
735    ) -> anyhow::Result<()> {
736        let mut inner = self.inner.write().await;
737        let path = inner.decided_leaf2_path();
738
739        // Ensure the anchor leaf directory exists.
740        fs::create_dir_all(&path).context("creating anchor leaf directory")?;
741
742        // Earlier versions stored only a single decided leaf in a regular file. If our storage is
743        // still on this version, migrate to a directory structure storing (possibly) many leaves.
744        let legacy_path = inner.legacy_anchor_leaf_path();
745        if !path.is_dir() && legacy_path.is_file() {
746            tracing::info!("migrating to multi-leaf storage");
747
748            // Move the existing data into the new directory.
749            let (leaf, qc) = inner
750                .load_anchor_leaf()?
751                .context("anchor leaf file exists but unable to load contents")?;
752            let view = leaf.view_number().u64();
753            let bytes = bincode::serialize(&(leaf, qc))?;
754            let new_file = path.join(view.to_string()).with_extension("txt");
755            inner
756                .replace(
757                    &new_file,
758                    |_| Ok(true),
759                    |mut file| {
760                        file.write_all(&bytes)?;
761                        Ok(())
762                    },
763                )
764                .context(format!("writing anchor leaf file {view}"))?;
765
766            // Now we can remove the old file.
767            fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
768        }
769
770        for (info, cert) in leaf_chain {
771            let view = info.leaf.view_number().u64();
772            let file_path = path.join(view.to_string()).with_extension("txt");
773            inner.replace(
774                &file_path,
775                |_| {
776                    // Don't overwrite an existing leaf, but warn about it as this is likely not
777                    // intended behavior from HotShot.
778                    tracing::warn!(view, "duplicate decided leaf");
779                    Ok(false)
780                },
781                |mut file| {
782                    let bytes = bincode::serialize(&(&info.leaf, cert))?;
783                    file.write_all(&bytes)?;
784                    Ok(())
785                },
786            )?;
787        }
788
789        match inner
790            .generate_decide_events(view, deciding_qc, consumer)
791            .await
792        {
793            Err(err) => {
794                // Event processing failure is not an error, since by this point we have at least
795                // managed to persist the decided leaves successfully, and the event processing will
796                // just run again at the next decide.
797                tracing::warn!(?view, "event processing failed: {err:#}");
798            },
799            Ok(intervals) => {
800                if let Err(err) = inner.collect_garbage(view, &intervals) {
801                    // Similarly, garbage collection is not an error. We have done everything we
802                    // strictly needed to do, and GC will run again at the next decide. Log the
803                    // error but do not return it.
804                    tracing::warn!(?view, "GC failed: {err:#}");
805                }
806            },
807        }
808
809        Ok(())
810    }
811
812    async fn load_anchor_leaf(
813        &self,
814    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
815        self.inner.read().await.load_anchor_leaf()
816    }
817
818    async fn load_da_proposal(
819        &self,
820        view: ViewNumber,
821    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
822        self.inner.read().await.load_da_proposal(view)
823    }
824
825    async fn load_vid_share(
826        &self,
827        view: ViewNumber,
828    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
829        self.inner.read().await.load_vid_share(view)
830    }
831
832    async fn append_vid(
833        &self,
834        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
835    ) -> anyhow::Result<()> {
836        let mut inner = self.inner.write().await;
837        let view_number = proposal.data.view_number().u64();
838        let dir_path = inner.vid2_dir_path();
839
840        fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
841
842        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
843        inner.replace(
844            &file_path,
845            |_| {
846                // Don't overwrite an existing share, but warn about it as this is likely not intended
847                // behavior from HotShot.
848                tracing::warn!(view_number, "duplicate VID share");
849                Ok(false)
850            },
851            |mut file| {
852                let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
853                let now = Instant::now();
854                file.write_all(&proposal_bytes)?;
855                self.metrics
856                    .internal_append_vid_duration
857                    .add_point(now.elapsed().as_secs_f64());
858                Ok(())
859            },
860        )
861    }
862
863    async fn append_da(
864        &self,
865        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
866        _vid_commit: VidCommitment,
867    ) -> anyhow::Result<()> {
868        let mut inner = self.inner.write().await;
869        let view_number = proposal.data.view_number().u64();
870        let dir_path = inner.da_dir_path();
871
872        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
873
874        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
875        inner.replace(
876            &file_path,
877            |_| {
878                // Don't overwrite an existing proposal, but warn about it as this is likely not
879                // intended behavior from HotShot.
880                tracing::warn!(view_number, "duplicate DA proposal");
881                Ok(false)
882            },
883            |mut file| {
884                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
885                let now = Instant::now();
886                file.write_all(&proposal_bytes)?;
887                self.metrics
888                    .internal_append_da_duration
889                    .add_point(now.elapsed().as_secs_f64());
890                Ok(())
891            },
892        )
893    }
894    async fn record_action(
895        &self,
896        view: ViewNumber,
897        _epoch: Option<EpochNumber>,
898        action: HotShotAction,
899    ) -> anyhow::Result<()> {
900        // Todo Remove this after https://github.com/EspressoSystems/espresso-network/issues/1931
901        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
902            return Ok(());
903        }
904        let mut inner = self.inner.write().await;
905        let path = &inner.voted_view_path();
906        inner.replace(
907            path,
908            |mut file| {
909                let mut bytes = vec![];
910                file.read_to_end(&mut bytes)?;
911                let bytes = bytes
912                    .try_into()
913                    .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
914                let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
915
916                // Overwrite the file if the saved view is older than the new view.
917                Ok(saved_view < view)
918            },
919            |mut file| {
920                file.write_all(&view.u64().to_le_bytes())?;
921                Ok(())
922            },
923        )?;
924
925        if matches!(action, HotShotAction::Vote) {
926            let restart_view_path = &inner.restart_view_path();
927            let restart_view = view + 1;
928            inner.replace(
929                restart_view_path,
930                |mut file| {
931                    let mut bytes = vec![];
932                    file.read_to_end(&mut bytes)?;
933                    let bytes = bytes
934                        .try_into()
935                        .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
936                    let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
937
938                    // Overwrite the file if the saved view is older than the new view.
939                    Ok(saved_view < restart_view)
940                },
941                |mut file| {
942                    file.write_all(&restart_view.u64().to_le_bytes())?;
943                    Ok(())
944                },
945            )?;
946        }
947        Ok(())
948    }
949
950    async fn append_quorum_proposal2(
951        &self,
952        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
953    ) -> anyhow::Result<()> {
954        let mut inner = self.inner.write().await;
955        let view_number = proposal.data.view_number().u64();
956        let dir_path = inner.quorum_proposals2_dir_path();
957
958        fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
959
960        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
961        inner.replace(
962            &file_path,
963            |_| {
964                // Always overwrite the previous file
965                Ok(true)
966            },
967            |mut file| {
968                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
969                let now = Instant::now();
970                file.write_all(&proposal_bytes)?;
971                self.metrics
972                    .internal_append_quorum2_duration
973                    .add_point(now.elapsed().as_secs_f64());
974                Ok(())
975            },
976        )
977    }
978    async fn load_quorum_proposals(
979        &self,
980    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
981    {
982        let inner = self.inner.read().await;
983
984        // First, get the proposal directory.
985        let dir_path = inner.quorum_proposals2_dir_path();
986        if !dir_path.is_dir() {
987            return Ok(Default::default());
988        }
989
990        // Read quorum proposals from every data file in this directory.
991        let mut map = BTreeMap::new();
992        for (view, path) in view_files(&dir_path)? {
993            let proposal_bytes = fs::read(path)?;
994            let Some(proposal) = bincode::deserialize::<
995                Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
996            >(&proposal_bytes)
997            .or_else(|error| {
998                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
999                    &proposal_bytes,
1000                )
1001                .map(convert_proposal)
1002                .inspect_err(|err_v3| {
1003                    // At this point, if the file contents are invalid, it is most likely an
1004                    // error rather than a miscellaneous file somehow ending up in the
1005                    // directory. However, we continue on, because it is better to collect as
1006                    // many proposals as we can rather than letting one bad proposal cause the
1007                    // entire operation to fail, and it is still possible that this was just
1008                    // some unintended file whose name happened to match the naming convention.
1009
1010                    tracing::warn!(
1011                        ?view,
1012                        %error,
1013                        error_v3 = %err_v3,
1014                        "ignoring malformed quorum proposal file"
1015                    );
1016                })
1017            })
1018            .ok() else {
1019                continue;
1020            };
1021
1022            let proposal2 = convert_proposal(proposal);
1023
1024            // Push to the map and we're done.
1025            map.insert(view, proposal2);
1026        }
1027
1028        Ok(map)
1029    }
1030
1031    async fn load_quorum_proposal(
1032        &self,
1033        view: ViewNumber,
1034    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1035        let inner = self.inner.read().await;
1036        let dir_path = inner.quorum_proposals2_dir_path();
1037        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1038        let bytes = fs::read(file_path)?;
1039        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1040            bincode::deserialize(&bytes).or_else(|error| {
1041                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1042                    &bytes,
1043                )
1044                .map(convert_proposal)
1045                .context(format!(
1046                    "Failed to deserialize quorum proposal for view {view:?}: {error}."
1047                ))
1048            })?;
1049        Ok(proposal)
1050    }
1051
1052    async fn load_upgrade_certificate(
1053        &self,
1054    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1055        let inner = self.inner.read().await;
1056        let path = inner.upgrade_certificate_dir_path();
1057        if !path.is_file() {
1058            return Ok(None);
1059        }
1060        let bytes = fs::read(&path).context("read")?;
1061        Ok(Some(
1062            bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1063        ))
1064    }
1065
1066    async fn store_upgrade_certificate(
1067        &self,
1068        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1069    ) -> anyhow::Result<()> {
1070        let mut inner = self.inner.write().await;
1071        let path = &inner.upgrade_certificate_dir_path();
1072        let certificate = match decided_upgrade_certificate {
1073            Some(cert) => cert,
1074            None => return Ok(()),
1075        };
1076        inner.replace(
1077            path,
1078            |_| {
1079                // Always overwrite the previous file.
1080                Ok(true)
1081            },
1082            |mut file| {
1083                let bytes =
1084                    bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1085                file.write_all(&bytes)?;
1086                Ok(())
1087            },
1088        )
1089    }
1090
1091    async fn store_next_epoch_quorum_certificate(
1092        &self,
1093        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1094    ) -> anyhow::Result<()> {
1095        let mut inner = self.inner.write().await;
1096        let path = &inner.next_epoch_qc();
1097
1098        inner.replace(
1099            path,
1100            |_| {
1101                // Always overwrite the previous file.
1102                Ok(true)
1103            },
1104            |mut file| {
1105                let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1106                file.write_all(&bytes)?;
1107                Ok(())
1108            },
1109        )
1110    }
1111
1112    async fn load_next_epoch_quorum_certificate(
1113        &self,
1114    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1115        let inner = self.inner.read().await;
1116        let path = inner.next_epoch_qc();
1117        if !path.is_file() {
1118            return Ok(None);
1119        }
1120        let bytes = fs::read(&path).context("read")?;
1121        Ok(Some(
1122            bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1123        ))
1124    }
1125
1126    async fn store_eqc(
1127        &self,
1128        high_qc: QuorumCertificate2<SeqTypes>,
1129        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1130    ) -> anyhow::Result<()> {
1131        let mut inner = self.inner.write().await;
1132        let path = &inner.eqc();
1133
1134        inner.replace(
1135            path,
1136            |_| {
1137                // Always overwrite the previous file.
1138                Ok(true)
1139            },
1140            |mut file| {
1141                let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1142                    .context("serializing next epoch qc")?;
1143                file.write_all(&bytes)?;
1144                Ok(())
1145            },
1146        )
1147    }
1148
1149    async fn load_eqc(
1150        &self,
1151    ) -> Option<(
1152        QuorumCertificate2<SeqTypes>,
1153        NextEpochQuorumCertificate2<SeqTypes>,
1154    )> {
1155        let inner = self.inner.read().await;
1156        let path = inner.eqc();
1157        if !path.is_file() {
1158            return None;
1159        }
1160        let bytes = fs::read(&path).ok()?;
1161
1162        bincode::deserialize(&bytes).ok()
1163    }
1164
1165    async fn append_da2(
1166        &self,
1167        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1168        _vid_commit: VidCommitment,
1169    ) -> anyhow::Result<()> {
1170        let mut inner = self.inner.write().await;
1171        let view_number = proposal.data.view_number().u64();
1172        let dir_path = inner.da2_dir_path();
1173
1174        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1175
1176        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1177        inner.replace(
1178            &file_path,
1179            |_| {
1180                // Don't overwrite an existing proposal, but warn about it as this is likely not
1181                // intended behavior from HotShot.
1182                tracing::warn!(view_number, "duplicate DA proposal");
1183                Ok(false)
1184            },
1185            |mut file| {
1186                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1187                let now = Instant::now();
1188                file.write_all(&proposal_bytes)?;
1189                self.metrics
1190                    .internal_append_da2_duration
1191                    .add_point(now.elapsed().as_secs_f64());
1192                Ok(())
1193            },
1194        )
1195    }
1196
1197    async fn append_proposal2(
1198        &self,
1199        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1200    ) -> anyhow::Result<()> {
1201        self.append_quorum_proposal2(proposal).await
1202    }
1203
1204    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1205        let mut inner = self.inner.write().await;
1206
1207        if inner.migrated.contains("anchor_leaf") {
1208            tracing::info!("decided leaves already migrated");
1209            return Ok(());
1210        }
1211
1212        let new_leaf_dir = inner.decided_leaf2_path();
1213
1214        fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2  dir")?;
1215
1216        let old_leaf_dir = inner.decided_leaf_path();
1217        if !old_leaf_dir.is_dir() {
1218            return Ok(());
1219        }
1220
1221        tracing::warn!("migrating decided leaves..");
1222        for entry in fs::read_dir(old_leaf_dir)? {
1223            let entry = entry?;
1224            let path = entry.path();
1225
1226            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1227                continue;
1228            };
1229            let Ok(view) = file.parse::<u64>() else {
1230                continue;
1231            };
1232
1233            let bytes =
1234                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1235            let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1236                .context(format!("parsing decided leaf {}", path.display()))?;
1237
1238            let leaf2: Leaf2 = leaf.into();
1239            let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1240
1241            let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1242
1243            inner.replace(
1244                &new_leaf_path,
1245                |_| {
1246                    tracing::warn!(view, "duplicate decided leaf");
1247                    Ok(false)
1248                },
1249                |mut file| {
1250                    let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1251                    file.write_all(&bytes)?;
1252                    Ok(())
1253                },
1254            )?;
1255
1256            if view % 100 == 0 {
1257                tracing::info!(view, "decided leaves migration progress");
1258            }
1259        }
1260
1261        inner.migrated.insert("anchor_leaf".to_string());
1262        inner.update_migration()?;
1263        tracing::warn!("successfully migrated decided leaves");
1264        Ok(())
1265    }
1266    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1267        let mut inner = self.inner.write().await;
1268
1269        if inner.migrated.contains("da_proposal") {
1270            tracing::info!("da proposals already migrated");
1271            return Ok(());
1272        }
1273
1274        let new_da_dir = inner.da2_dir_path();
1275
1276        fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1277
1278        let old_da_dir = inner.da_dir_path();
1279        if !old_da_dir.is_dir() {
1280            return Ok(());
1281        }
1282
1283        tracing::warn!("migrating da proposals..");
1284
1285        for entry in fs::read_dir(old_da_dir)? {
1286            let entry = entry?;
1287            let path = entry.path();
1288
1289            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1290                continue;
1291            };
1292            let Ok(view) = file.parse::<u64>() else {
1293                continue;
1294            };
1295
1296            let bytes =
1297                fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1298            let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1299                .context(format!("parsing da proposal {}", path.display()))?;
1300
1301            let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1302
1303            let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1304
1305            inner.replace(
1306                &new_da_path,
1307                |_| {
1308                    tracing::warn!(view, "duplicate DA proposal 2");
1309                    Ok(false)
1310                },
1311                |mut file| {
1312                    let bytes = bincode::serialize(&proposal2)?;
1313                    file.write_all(&bytes)?;
1314                    Ok(())
1315                },
1316            )?;
1317
1318            if view % 100 == 0 {
1319                tracing::info!(view, "DA proposals migration progress");
1320            }
1321        }
1322
1323        inner.migrated.insert("da_proposal".to_string());
1324        inner.update_migration()?;
1325        tracing::warn!("successfully migrated da proposals");
1326        Ok(())
1327    }
1328    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1329        let mut inner = self.inner.write().await;
1330
1331        if inner.migrated.contains("vid_share") {
1332            tracing::info!("vid shares already migrated");
1333            return Ok(());
1334        }
1335
1336        let new_vid_dir = inner.vid2_dir_path();
1337
1338        fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1339
1340        let old_vid_dir = inner.vid_dir_path();
1341        if !old_vid_dir.is_dir() {
1342            return Ok(());
1343        }
1344
1345        tracing::warn!("migrating vid shares..");
1346
1347        for entry in fs::read_dir(old_vid_dir)? {
1348            let entry = entry?;
1349            let path = entry.path();
1350
1351            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1352                continue;
1353            };
1354            let Ok(view) = file.parse::<u64>() else {
1355                continue;
1356            };
1357
1358            let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1359            let proposal =
1360                bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1361                    .context(format!("parsing vid share {}", path.display()))?;
1362
1363            let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1364
1365            let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1366                convert_proposal(proposal);
1367
1368            inner.replace(
1369                &new_vid_path,
1370                |_| {
1371                    tracing::warn!(view, "duplicate VID share ");
1372                    Ok(false)
1373                },
1374                |mut file| {
1375                    let bytes = bincode::serialize(&proposal2)?;
1376                    file.write_all(&bytes)?;
1377                    Ok(())
1378                },
1379            )?;
1380
1381            if view % 100 == 0 {
1382                tracing::info!(view, "VID shares migration progress");
1383            }
1384        }
1385
1386        inner.migrated.insert("vid_share".to_string());
1387        inner.update_migration()?;
1388        tracing::warn!("successfully migrated vid shares");
1389        Ok(())
1390    }
1391
1392    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1393        let mut inner = self.inner.write().await;
1394
1395        if inner.migrated.contains("quorum_proposals") {
1396            tracing::info!("quorum proposals already migrated");
1397            return Ok(());
1398        }
1399
1400        let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1401
1402        fs::create_dir_all(new_quorum_proposals_dir.clone())
1403            .context("failed to create quorum proposals 2 dir")?;
1404
1405        let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1406        if !old_quorum_proposals_dir.is_dir() {
1407            tracing::info!("no existing quorum proposals found for migration");
1408            return Ok(());
1409        }
1410
1411        tracing::warn!("migrating quorum proposals..");
1412        for entry in fs::read_dir(old_quorum_proposals_dir)? {
1413            let entry = entry?;
1414            let path = entry.path();
1415
1416            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1417                continue;
1418            };
1419            let Ok(view) = file.parse::<u64>() else {
1420                continue;
1421            };
1422
1423            let bytes =
1424                fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1425            let proposal =
1426                bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1427                    .context(format!("parsing quorum proposal {}", path.display()))?;
1428
1429            let new_file_path = new_quorum_proposals_dir
1430                .join(view.to_string())
1431                .with_extension("txt");
1432
1433            let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1434                convert_proposal(proposal);
1435
1436            inner.replace(
1437                &new_file_path,
1438                |_| {
1439                    tracing::warn!(view, "duplicate Quorum proposal2 ");
1440                    Ok(false)
1441                },
1442                |mut file| {
1443                    let bytes = bincode::serialize(&proposal2)?;
1444                    file.write_all(&bytes)?;
1445                    Ok(())
1446                },
1447            )?;
1448
1449            if view % 100 == 0 {
1450                tracing::info!(view, "Quorum proposals migration progress");
1451            }
1452        }
1453
1454        inner.migrated.insert("quorum_proposals".to_string());
1455        inner.update_migration()?;
1456        tracing::warn!("successfully migrated quorum proposals");
1457        Ok(())
1458    }
1459    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1460        Ok(())
1461    }
1462
1463    async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
1464        let mut inner = self.inner.write().await;
1465
1466        if inner.migrated.contains("x25519_keys") {
1467            tracing::info!("x25519_keys migration already complete");
1468            return Ok(());
1469        }
1470
1471        let path = inner.stake_table_dir_path();
1472        if !path.is_dir() {
1473            inner.migrated.insert("x25519_keys".to_string());
1474            inner.update_migration()?;
1475            return Ok(());
1476        }
1477
1478        tracing::warn!("migrating stake tables to add x25519 key fields...");
1479
1480        for (epoch, file_path) in epoch_files(&path)? {
1481            let bytes = fs::read(&file_path).with_context(|| {
1482                format!("failed to read stake table file at {}", file_path.display())
1483            })?;
1484
1485            let (stake, needs_rewrite) = deserialize_stake_table(&bytes).with_context(|| {
1486                format!(
1487                    "failed to deserialize stake table at {}",
1488                    file_path.display()
1489                )
1490            })?;
1491
1492            if !needs_rewrite {
1493                continue;
1494            }
1495
1496            // Atomic write: write to temp, then rename
1497            let new_bytes = bincode::serialize(&stake)?;
1498            let tmp_path = file_path.with_extension("txt.tmp");
1499            fs::write(&tmp_path, new_bytes)?;
1500            fs::rename(&tmp_path, &file_path)?;
1501
1502            tracing::info!(?epoch, "migrated stake table");
1503        }
1504
1505        // Also migrate validators JSON files (used by store_all_validators)
1506        let validators_dir = path.join("validators");
1507        if validators_dir.is_dir() {
1508            type LegacyJsonMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
1509
1510            for entry in fs::read_dir(&validators_dir)? {
1511                let entry = entry?;
1512                let file_path = entry.path();
1513                if file_path.extension().is_some_and(|ext| ext == "json") {
1514                    let content = fs::read_to_string(&file_path)?;
1515
1516                    // Try current format first
1517                    if serde_json::from_str::<RegisteredValidatorMap>(&content).is_ok() {
1518                        continue;
1519                    }
1520
1521                    // Migrate from legacy format (no x25519_key/p2p_addr)
1522                    let legacy: LegacyJsonMap =
1523                        serde_json::from_str(&content).with_context(|| {
1524                            format!(
1525                                "failed to deserialize validators at {} (tried both formats)",
1526                                file_path.display()
1527                            )
1528                        })?;
1529
1530                    let migrated: RegisteredValidatorMap = legacy
1531                        .into_iter()
1532                        .map(|(addr, v)| (addr, v.migrate()))
1533                        .collect();
1534
1535                    // Atomic write: write to temp, then rename
1536                    let new_json = serde_json::to_string_pretty(&migrated)?;
1537                    let tmp_path = file_path.with_extension("json.tmp");
1538                    fs::write(&tmp_path, new_json)?;
1539                    fs::rename(&tmp_path, &file_path)?;
1540
1541                    tracing::info!(?file_path, "migrated validators file");
1542                }
1543            }
1544        }
1545
1546        inner.migrated.insert("x25519_keys".to_string());
1547        inner.update_migration()?;
1548        tracing::warn!("x25519_keys migration complete");
1549        Ok(())
1550    }
1551
1552    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1553        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1554            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1555                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1556            } else if loaded_drb_input.iteration >= drb_input.iteration {
1557                anyhow::bail!(
1558                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1559                    loaded_drb_input,
1560                    drb_input
1561                )
1562            }
1563        }
1564
1565        let mut inner = self.inner.write().await;
1566        let dir_path = inner.drb_dir_path();
1567
1568        fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1569
1570        let drb_input_bytes =
1571            bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1572
1573        let file_path = dir_path
1574            .join(drb_input.epoch.to_string())
1575            .with_extension("bin");
1576
1577        inner.replace(
1578            &file_path,
1579            |_| {
1580                // Always overwrite the previous file.
1581                Ok(true)
1582            },
1583            |mut file| {
1584                file.write_all(&drb_input_bytes).context(format!(
1585                    "writing epoch drb_input file for epoch {:?} at {:?}",
1586                    drb_input.epoch, file_path
1587                ))
1588            },
1589        )
1590    }
1591
1592    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1593        let inner = self.inner.read().await;
1594        let path = &inner.drb_dir_path();
1595        let file_path = path.join(epoch.to_string()).with_extension("bin");
1596        let bytes = fs::read(&file_path).context("read")?;
1597        Ok(bincode::deserialize(&bytes)
1598            .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1599    }
1600
1601    async fn store_drb_result(
1602        &self,
1603        epoch: EpochNumber,
1604        drb_result: DrbResult,
1605    ) -> anyhow::Result<()> {
1606        let mut inner = self.inner.write().await;
1607        let dir_path = inner.epoch_drb_result_dir_path();
1608
1609        fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1610
1611        let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1612
1613        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1614
1615        inner.replace(
1616            &file_path,
1617            |_| {
1618                // Always overwrite the previous file.
1619                Ok(true)
1620            },
1621            |mut file| {
1622                file.write_all(&drb_result_bytes)
1623                    .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1624            },
1625        )
1626    }
1627
1628    async fn store_epoch_root(
1629        &self,
1630        epoch: EpochNumber,
1631        block_header: <SeqTypes as NodeType>::BlockHeader,
1632    ) -> anyhow::Result<()> {
1633        let mut inner = self.inner.write().await;
1634        let dir_path = inner.epoch_root_block_header_dir_path();
1635
1636        fs::create_dir_all(dir_path.clone())
1637            .context("failed to create epoch root block header dir")?;
1638
1639        let block_header_bytes =
1640            bincode::serialize(&block_header).context("serialize block header")?;
1641
1642        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1643        inner
1644            .replace(
1645                &file_path,
1646                |_| Ok(true),
1647                |mut file| {
1648                    file.write_all(&block_header_bytes)?;
1649                    Ok(())
1650                },
1651            )
1652            .context(format!(
1653                "writing epoch root block header file for epoch {epoch:?}"
1654            ))?;
1655
1656        Ok(())
1657    }
1658
1659    async fn add_state_cert(
1660        &self,
1661        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1662    ) -> anyhow::Result<()> {
1663        let mut inner = self.inner.write().await;
1664        // let epoch = state_cert.epoch;
1665        let view = state_cert.light_client_state.view_number;
1666        let dir_path = inner.state_cert_dir_path();
1667
1668        fs::create_dir_all(dir_path.clone())
1669            .context("failed to create light client state update certificate dir")?;
1670
1671        let bytes = bincode::serialize(&state_cert)
1672            .context("serialize light client state update certificate")?;
1673
1674        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1675        inner
1676            .replace(
1677                &file_path,
1678                |_| Ok(true),
1679                |mut file| {
1680                    file.write_all(&bytes)?;
1681                    Ok(())
1682                },
1683            )
1684            .context(format!(
1685                "writing light client state update certificate file for view {view:?}"
1686            ))?;
1687
1688        Ok(())
1689    }
1690
1691    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1692        let inner = self.inner.read().await;
1693        let drb_dir_path = inner.epoch_drb_result_dir_path();
1694        let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1695
1696        let mut result = Vec::new();
1697
1698        if !drb_dir_path.is_dir() {
1699            return Ok(Vec::new());
1700        }
1701        for (epoch, path) in epoch_files(drb_dir_path)? {
1702            let bytes =
1703                fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1704            let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1705                .context(format!("parsing epoch drb result {}", path.display()))?;
1706
1707            let block_header_path = block_header_dir_path
1708                .join(epoch.to_string())
1709                .with_extension("txt");
1710            let block_header = if block_header_path.is_file() {
1711                let bytes = fs::read(&block_header_path).context(format!(
1712                    "reading epoch root block header {}",
1713                    block_header_path.display()
1714                ))?;
1715                Some(
1716                    bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1717                        format!(
1718                            "parsing epoch root block header {}",
1719                            block_header_path.display()
1720                        ),
1721                    )?,
1722                )
1723            } else {
1724                None
1725            };
1726
1727            result.push(InitializerEpochInfo::<SeqTypes> {
1728                epoch,
1729                drb_result,
1730                block_header,
1731            });
1732        }
1733
1734        result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1735
1736        // Keep only the most recent epochs
1737        let start = result
1738            .len()
1739            .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1740        let recent = result[start..].to_vec();
1741
1742        Ok(recent)
1743    }
1744
1745    async fn load_state_cert(
1746        &self,
1747    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1748        let inner = self.inner.read().await;
1749        let dir_path = inner.finalized_state_cert_dir_path();
1750
1751        if !dir_path.is_dir() {
1752            return Ok(None);
1753        }
1754
1755        let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1756
1757        for (epoch, path) in epoch_files(dir_path)? {
1758            if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1759                continue;
1760            }
1761            let bytes = fs::read(&path).context(format!(
1762                "reading light client state update certificate {}",
1763                path.display()
1764            ))?;
1765            let cert =
1766                bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1767                    .or_else(|error| {
1768                        tracing::info!(
1769                            %error,
1770                            path = %path.display(),
1771                            "Failed to deserialize LightClientStateUpdateCertificateV2"
1772                        );
1773
1774                        bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1775                            &bytes,
1776                        )
1777                        .map(Into::into)
1778                        .with_context(|| {
1779                            format!(
1780                                "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1781                                path.display()
1782                            )
1783                        })
1784                    })?;
1785
1786            result = Some(cert);
1787        }
1788
1789        Ok(result)
1790    }
1791
1792    async fn get_state_cert_by_epoch(
1793        &self,
1794        epoch: u64,
1795    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1796        let inner = self.inner.read().await;
1797        let dir_path = inner.finalized_state_cert_dir_path();
1798
1799        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1800
1801        if !file_path.exists() {
1802            return Ok(None);
1803        }
1804
1805        let bytes = fs::read(&file_path).context(format!(
1806            "reading light client state update certificate {}",
1807            file_path.display()
1808        ))?;
1809
1810        let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1811            .or_else(|error| {
1812                tracing::info!(
1813                    %error,
1814                    path = %file_path.display(),
1815                    "Failed to deserialize LightClientStateUpdateCertificateV2"
1816                );
1817
1818                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1819                    .map(Into::into)
1820                    .with_context(|| {
1821                        format!(
1822                            "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1823                            file_path.display()
1824                        )
1825                    })
1826            })?;
1827
1828        Ok(Some(cert))
1829    }
1830
1831    async fn insert_state_cert(
1832        &self,
1833        epoch: u64,
1834        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1835    ) -> anyhow::Result<()> {
1836        let inner = self.inner.read().await;
1837        let dir_path = inner.finalized_state_cert_dir_path();
1838
1839        fs::create_dir_all(&dir_path)
1840            .context(format!("creating state cert dir {}", dir_path.display()))?;
1841
1842        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1843        let bytes = bincode::serialize(&cert)
1844            .context("serializing light client state update certificate")?;
1845
1846        fs::write(&file_path, bytes).context(format!(
1847            "writing light client state update certificate {}",
1848            file_path.display()
1849        ))?;
1850
1851        Ok(())
1852    }
1853
1854    fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1855        // todo!()
1856    }
1857}
1858
1859#[async_trait]
1860impl MembershipPersistence for Persistence {
1861    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
1862        let inner = self.inner.read().await;
1863        let path = &inner.stake_table_dir_path();
1864        let file_path = path.join(epoch.to_string()).with_extension("txt");
1865
1866        if !file_path.exists() {
1867            return Ok(None);
1868        }
1869
1870        let bytes = fs::read(&file_path).with_context(|| {
1871            format!("failed to read stake table file at {}", file_path.display())
1872        })?;
1873
1874        let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1875            format!(
1876                "failed to deserialize stake table at {}",
1877                file_path.display()
1878            )
1879        })?;
1880        Ok(Some(stake))
1881    }
1882
1883    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1884        let limit = limit as usize;
1885        let inner = self.inner.read().await;
1886        let path = &inner.stake_table_dir_path();
1887        let sorted_files = epoch_files(path)?
1888            .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1889            .take(limit);
1890        let mut validator_sets: Vec<IndexedStake> = Vec::new();
1891
1892        for (epoch, file_path) in sorted_files {
1893            let bytes = fs::read(&file_path).with_context(|| {
1894                format!("failed to read stake table file at {}", file_path.display())
1895            })?;
1896
1897            let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1898                format!(
1899                    "failed to deserialize stake table at {}",
1900                    file_path.display()
1901                )
1902            })?;
1903            validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1904        }
1905
1906        Ok(Some(validator_sets))
1907    }
1908
1909    async fn store_stake(
1910        &self,
1911        epoch: EpochNumber,
1912        stake: AuthenticatedValidatorMap,
1913        block_reward: Option<RewardAmount>,
1914        stake_table_hash: Option<StakeTableHash>,
1915    ) -> anyhow::Result<()> {
1916        let mut inner = self.inner.write().await;
1917        let dir_path = &inner.stake_table_dir_path();
1918
1919        fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1920
1921        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1922
1923        inner.replace(
1924            &file_path,
1925            |_| {
1926                // Always overwrite the previous file.
1927                Ok(true)
1928            },
1929            |mut file| {
1930                let data: StakeTuple = (stake, block_reward, stake_table_hash);
1931                let bytes =
1932                    bincode::serialize(&data).context("serializing combined stake table")?;
1933                file.write_all(&bytes)?;
1934                Ok(())
1935            },
1936        )
1937    }
1938
1939    /// store stake table events upto the l1 block
1940    async fn store_events(
1941        &self,
1942        to_l1_block: u64,
1943        events: Vec<(EventKey, StakeTableEvent)>,
1944    ) -> anyhow::Result<()> {
1945        let mut inner = self.inner.write().await;
1946        let dir_path = &inner.stake_table_dir_path();
1947        let events_dir = dir_path.join("events");
1948
1949        fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1950        // Read the last l1 finalized for which events has been stored
1951        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1952
1953        // check if the last l1 events is higher than the incoming one
1954        if last_l1_finalized_path.exists() {
1955            let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1956                format!("Failed to read file at path: {last_l1_finalized_path:?}")
1957            })?;
1958            let mut buf = [0; 8];
1959            bytes
1960                .as_slice()
1961                .read_exact(&mut buf[..8])
1962                .with_context(|| {
1963                    format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1964                })?;
1965            let persisted_l1_block = u64::from_le_bytes(buf);
1966            if persisted_l1_block > to_l1_block {
1967                tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1968                return Ok(());
1969            }
1970        }
1971
1972        // stores each event in a separate file
1973        // this can cause performance issue when, for example, reading all the files
1974        // However, the plan is to remove file system completely in future
1975        for (event_key, event) in events {
1976            let (block_number, event_index) = event_key;
1977            // file name is like block_index.json
1978            let filename = format!("{block_number}_{event_index}");
1979            let file_path = events_dir.join(filename).with_extension("json");
1980
1981            if file_path.exists() {
1982                continue;
1983            }
1984
1985            inner
1986                .replace(
1987                    &file_path,
1988                    |_| Ok(true),
1989                    |file| {
1990                        let writer = BufWriter::new(file);
1991
1992                        serde_json::to_writer_pretty(writer, &event)?;
1993                        Ok(())
1994                    },
1995                )
1996                .context("Failed to write event to file")?;
1997        }
1998
1999        // update the l1 block for which we have processed events
2000        inner.replace(
2001            &last_l1_finalized_path,
2002            |_| Ok(true),
2003            |mut file| {
2004                let bytes = to_l1_block.to_le_bytes();
2005
2006                file.write_all(&bytes)?;
2007                tracing::debug!("updated l1 finalized ={to_l1_block:?}");
2008                Ok(())
2009            },
2010        )
2011    }
2012
2013    /// Loads all events from persistent storage up to the specified L1 block.
2014    ///
2015    /// # Returns
2016    ///
2017    /// Returns a tuple containing:
2018    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
2019    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
2020    /// event key is (l1 block number, log index)
2021    ///   and the corresponding StakeTable event.
2022    ///
2023    async fn load_events(
2024        &self,
2025        from_l1_block: u64,
2026        to_l1_block: u64,
2027    ) -> anyhow::Result<(
2028        Option<EventsPersistenceRead>,
2029        Vec<(EventKey, StakeTableEvent)>,
2030    )> {
2031        let inner = self.inner.read().await;
2032        let dir_path = inner.stake_table_dir_path();
2033        let events_dir = dir_path.join("events");
2034
2035        // check if we have any events in storage
2036        // we can do this by checking last l1 finalized block for which we processed events
2037        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2038
2039        if !last_l1_finalized_path.exists() || !events_dir.exists() {
2040            return Ok((None, Vec::new()));
2041        }
2042
2043        let mut events = Vec::new();
2044
2045        let bytes = fs::read(&last_l1_finalized_path)
2046            .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
2047        let mut buf = [0; 8];
2048        bytes
2049            .as_slice()
2050            .read_exact(&mut buf[..8])
2051            .with_context(|| {
2052                format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2053            })?;
2054
2055        let last_processed_l1_block = u64::from_le_bytes(buf);
2056
2057        // Determine the L1 block for querying events.
2058        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
2059        // Otherwise, query up to the last stored block.
2060        let query_l1_block = if last_processed_l1_block > to_l1_block {
2061            to_l1_block
2062        } else {
2063            last_processed_l1_block
2064        };
2065
2066        for entry in fs::read_dir(&events_dir).context("events directory")? {
2067            let entry = entry?;
2068            let path = entry.path();
2069
2070            if !entry.file_type()?.is_file() {
2071                continue;
2072            }
2073
2074            if path
2075                .extension()
2076                .context(format!("extension for path={path:?}"))?
2077                != "json"
2078            {
2079                continue;
2080            }
2081
2082            let filename = path
2083                .file_stem()
2084                .and_then(|f| f.to_str())
2085                .unwrap_or_default();
2086
2087            let parts: Vec<&str> = filename.split('_').collect();
2088            if parts.len() != 2 {
2089                continue;
2090            }
2091
2092            let block_number = parts[0].parse::<u64>()?;
2093            let log_index = parts[1].parse::<u64>()?;
2094
2095            if block_number < from_l1_block || block_number > query_l1_block {
2096                continue;
2097            }
2098
2099            let file =
2100                File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2101            let reader = BufReader::new(file);
2102
2103            let event: StakeTableEvent = serde_json::from_reader(reader)
2104                .context(format!("Failed to deserialize event at path={path:?}"))?;
2105
2106            events.push(((block_number, log_index), event));
2107        }
2108
2109        events.sort_by_key(|(key, _)| *key);
2110
2111        if query_l1_block == to_l1_block {
2112            Ok((Some(EventsPersistenceRead::Complete), events))
2113        } else {
2114            Ok((
2115                Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2116                events,
2117            ))
2118        }
2119    }
2120
2121    async fn delete_stake_tables(&self) -> anyhow::Result<()> {
2122        let inner = self.inner.write().await;
2123        let events_dir = inner.stake_table_dir_path().join("events");
2124        if events_dir.exists() {
2125            fs::remove_dir_all(&events_dir)
2126                .with_context(|| format!("Failed to remove events dir: {events_dir:?}"))?;
2127        }
2128        let validators_dir = inner.stake_table_dir_path().join("validators");
2129        if validators_dir.exists() {
2130            fs::remove_dir_all(&validators_dir)
2131                .with_context(|| format!("Failed to remove validators dir: {validators_dir:?}"))?;
2132        }
2133        let drb_dir = inner.epoch_drb_result_dir_path();
2134        if drb_dir.exists() {
2135            fs::remove_dir_all(&drb_dir)
2136                .with_context(|| format!("Failed to remove epoch DRB result dir: {drb_dir:?}"))?;
2137        }
2138        Ok(())
2139    }
2140
2141    async fn store_all_validators(
2142        &self,
2143        epoch: EpochNumber,
2144        all_validators: RegisteredValidatorMap,
2145    ) -> anyhow::Result<()> {
2146        let mut inner = self.inner.write().await;
2147        let dir_path = inner.stake_table_dir_path();
2148        let validators_dir = dir_path.join("validators");
2149
2150        // Ensure validators directory exists
2151        fs::create_dir_all(&validators_dir)
2152            .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2153
2154        // Path = validators/epoch_<number>.json
2155        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2156
2157        inner
2158            .replace(
2159                &file_path,
2160                |_| Ok(true),
2161                |file| {
2162                    let writer = BufWriter::new(file);
2163
2164                    serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2165                        format!("Failed to serialize validators for epoch {epoch}")
2166                    })?;
2167                    Ok(())
2168                },
2169            )
2170            .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2171
2172        Ok(())
2173    }
2174
2175    async fn load_all_validators(
2176        &self,
2177        epoch: EpochNumber,
2178        offset: u64,
2179        limit: u64,
2180    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
2181        let inner = self.inner.read().await;
2182        let dir_path = inner.stake_table_dir_path();
2183        let validators_dir = dir_path.join("validators");
2184        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2185
2186        if !file_path.exists() {
2187            bail!("Validator file not found for epoch {epoch}");
2188        }
2189
2190        let file = File::open(&file_path)
2191            .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2192        let reader = BufReader::new(file);
2193
2194        let map: RegisteredValidatorMap = serde_json::from_reader(reader).with_context(|| {
2195            format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2196        })?;
2197
2198        let mut values: Vec<RegisteredValidator<PubKey>> = map.into_values().collect();
2199        values.sort_by_key(|v| v.account);
2200
2201        let start = offset as usize;
2202        let end = (start + limit as usize).min(values.len());
2203
2204        if start >= values.len() {
2205            return Ok(vec![]);
2206        }
2207
2208        Ok(values[start..end].to_vec())
2209    }
2210}
2211
2212#[async_trait]
2213impl DhtPersistentStorage for Persistence {
2214    /// Save the DHT to the file on disk
2215    ///
2216    /// # Errors
2217    /// - If we fail to serialize the records
2218    /// - If we fail to write the serialized records to the file
2219    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2220        // Bincode-serialize the records
2221        let to_save =
2222            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2223
2224        // Get the path to save the file to
2225        let path = self.inner.read().await.libp2p_dht_path();
2226
2227        // Create the directory if it doesn't exist
2228        fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2229            .with_context(|| "failed to create directory")?;
2230
2231        // Get a write lock on the inner struct
2232        let mut inner = self.inner.write().await;
2233
2234        // Save the file, replacing the previous one if it exists
2235        inner
2236            .replace(
2237                &path,
2238                |_| {
2239                    // Always overwrite the previous file
2240                    Ok(true)
2241                },
2242                |mut file| {
2243                    file.write_all(&to_save)
2244                        .with_context(|| "failed to write records to file")?;
2245                    Ok(())
2246                },
2247            )
2248            .with_context(|| "failed to save records to file")?;
2249
2250        Ok(())
2251    }
2252
2253    /// Load the DHT from the file on disk
2254    ///
2255    /// # Errors
2256    /// - If we fail to read the file
2257    /// - If we fail to deserialize the records
2258    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2259        // Read the contents of the file
2260        let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2261            .with_context(|| "Failed to read records from file")?;
2262
2263        // Deserialize the contents
2264        let records: Vec<SerializableRecord> =
2265            bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2266
2267        Ok(records)
2268    }
2269}
2270
2271/// Get all paths under `dir` whose name is of the form <view number>.txt.
2272fn view_files(
2273    dir: impl AsRef<Path>,
2274) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2275    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2276        let dir = dir.as_ref().display();
2277        let entry = entry.ok()?;
2278        if !entry.file_type().ok()?.is_file() {
2279            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2280            return None;
2281        }
2282        let path = entry.path();
2283        if path.extension()? != "txt" {
2284            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2285            return None;
2286        }
2287        let file_name = path.file_stem()?;
2288        let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2289            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2290            return None;
2291        };
2292        Some((ViewNumber::new(view_number), entry.path().to_owned()))
2293    }))
2294}
2295
2296/// Get all paths under `dir` whose name is of the form <epoch number>.txt.
2297/// Should probably be made generic and merged with view_files.
2298fn epoch_files(
2299    dir: impl AsRef<Path>,
2300) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2301    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2302        let dir = dir.as_ref().display();
2303        let entry = entry.ok()?;
2304        if !entry.file_type().ok()?.is_file() {
2305            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2306            return None;
2307        }
2308        let path = entry.path();
2309        if path.extension()? != "txt" {
2310            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2311            return None;
2312        }
2313        let file_name = path.file_stem()?;
2314        let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2315            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2316            return None;
2317        };
2318        Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2319    }))
2320}
2321
2322#[cfg(test)]
2323mod test {
2324    use std::marker::PhantomData;
2325
2326    use committable::{Commitment, CommitmentBoundsArkless, Committable};
2327    use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2328    use hotshot::types::SignatureKey;
2329    use hotshot_example_types::node_types::TEST_VERSIONS;
2330    use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
2331    use hotshot_types::{
2332        data::QuorumProposal2,
2333        light_client::LightClientState,
2334        simple_certificate::QuorumCertificate,
2335        simple_vote::QuorumData,
2336        traits::{EncodeBytes, block_contents::GENESIS_VID_NUM_STORAGE_NODES},
2337        vid::advz::advz_scheme,
2338    };
2339    use jf_advz::VidScheme;
2340    use serde_json::json;
2341    use tempfile::TempDir;
2342
2343    use super::*;
2344    use crate::{BLSPubKey, persistence::tests::TestablePersistence};
2345
2346    #[async_trait]
2347    impl TestablePersistence for Persistence {
2348        type Storage = TempDir;
2349
2350        async fn tmp_storage() -> Self::Storage {
2351            TempDir::new().unwrap()
2352        }
2353
2354        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2355            Options::new(storage.path().into())
2356        }
2357    }
2358
2359    #[test]
2360    fn test_config_migrations_add_builder_urls() {
2361        let before = json!({
2362            "config": {
2363                "builder_url": "https://test:8080",
2364                "start_proposing_view": 1,
2365                "stop_proposing_view": 2,
2366                "start_voting_view": 1,
2367                "stop_voting_view": 2,
2368                "start_proposing_time": 1,
2369                "stop_proposing_time": 2,
2370                "start_voting_time": 1,
2371                "stop_voting_time": 2
2372            }
2373        });
2374        let after = json!({
2375            "config": {
2376                "builder_urls": ["https://test:8080"],
2377                "start_proposing_view": 1,
2378                "stop_proposing_view": 2,
2379                "start_voting_view": 1,
2380                "stop_voting_view": 2,
2381                "start_proposing_time": 1,
2382                "stop_proposing_time": 2,
2383                "start_voting_time": 1,
2384                "stop_voting_time": 2,
2385                "epoch_height": 0,
2386                "drb_difficulty": 0,
2387                "drb_upgrade_difficulty": 0,
2388                "da_committees": [],
2389            }
2390        });
2391
2392        assert_eq!(migrate_network_config(before).unwrap(), after);
2393    }
2394
2395    #[test]
2396    fn test_config_migrations_existing_builder_urls() {
2397        let before = json!({
2398            "config": {
2399                "builder_urls": ["https://test:8080", "https://test:8081"],
2400                "start_proposing_view": 1,
2401                "stop_proposing_view": 2,
2402                "start_voting_view": 1,
2403                "stop_voting_view": 2,
2404                "start_proposing_time": 1,
2405                "stop_proposing_time": 2,
2406                "start_voting_time": 1,
2407                "stop_voting_time": 2,
2408                "epoch_height": 0,
2409                "drb_difficulty": 0,
2410                "drb_upgrade_difficulty": 0,
2411                "da_committees": [],
2412            }
2413        });
2414
2415        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2416    }
2417
2418    #[test]
2419    fn test_config_migrations_add_upgrade_params() {
2420        let before = json!({
2421            "config": {
2422                "builder_urls": ["https://test:8080", "https://test:8081"]
2423            }
2424        });
2425        let after = json!({
2426            "config": {
2427                "builder_urls": ["https://test:8080", "https://test:8081"],
2428                "start_proposing_view": 9007199254740991u64,
2429                "stop_proposing_view": 0,
2430                "start_voting_view": 9007199254740991u64,
2431                "stop_voting_view": 0,
2432                "start_proposing_time": 9007199254740991u64,
2433                "stop_proposing_time": 0,
2434                "start_voting_time": 9007199254740991u64,
2435                "stop_voting_time": 0,
2436                "epoch_height": 0,
2437                "drb_difficulty": 0,
2438                "drb_upgrade_difficulty": 0,
2439                "da_committees": [],
2440            }
2441        });
2442
2443        assert_eq!(migrate_network_config(before).unwrap(), after);
2444    }
2445
2446    #[test]
2447    fn test_config_migrations_existing_upgrade_params() {
2448        let before = json!({
2449            "config": {
2450                "builder_urls": ["https://test:8080", "https://test:8081"],
2451                "start_proposing_view": 1,
2452                "stop_proposing_view": 2,
2453                "start_voting_view": 1,
2454                "stop_voting_view": 2,
2455                "start_proposing_time": 1,
2456                "stop_proposing_time": 2,
2457                "start_voting_time": 1,
2458                "stop_voting_time": 2,
2459                "epoch_height": 0,
2460                "drb_difficulty": 0,
2461                "drb_upgrade_difficulty": 0,
2462                "da_committees": [],
2463            }
2464        });
2465
2466        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2467    }
2468
2469    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2470    pub async fn test_consensus_migration() {
2471        let rows = 300;
2472        let tmp = Persistence::tmp_storage().await;
2473        let mut opt = Persistence::options(&tmp);
2474        let storage = opt.create().await.unwrap();
2475
2476        let inner = storage.inner.read().await;
2477
2478        let decided_leaves_path = inner.decided_leaf_path();
2479        fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2480
2481        let qp_dir_path = inner.quorum_proposals_dir_path();
2482        fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2483
2484        let state_cert_dir_path = inner.state_cert_dir_path();
2485        fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2486        drop(inner);
2487
2488        assert!(storage.load_state_cert().await.unwrap().is_none());
2489
2490        for i in 0..rows {
2491            let view = ViewNumber::new(i);
2492            let validated_state = ValidatedState::default();
2493            let instance_state = NodeState::default();
2494
2495            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2496            let (payload, metadata) =
2497                Payload::from_transactions([], &validated_state, &instance_state)
2498                    .await
2499                    .unwrap();
2500
2501            let payload_bytes = payload.encode();
2502
2503            let block_header = Header::genesis(
2504                &instance_state,
2505                payload.clone(),
2506                &metadata,
2507                TEST_VERSIONS.test.base,
2508            );
2509
2510            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2511                epoch: EpochNumber::new(i),
2512                light_client_state: LightClientState {
2513                    view_number: i,
2514                    block_height: i,
2515                    block_comm_root: Default::default(),
2516                },
2517                next_stake_table_state: Default::default(),
2518                signatures: vec![], // filling arbitrary value
2519                auth_root: Default::default(),
2520            };
2521            assert!(storage.add_state_cert(state_cert).await.is_ok());
2522
2523            let null_quorum_data = QuorumData {
2524                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2525            };
2526
2527            let justify_qc = QuorumCertificate::new(
2528                null_quorum_data.clone(),
2529                null_quorum_data.commit(),
2530                view,
2531                None,
2532                PhantomData,
2533            );
2534
2535            let quorum_proposal = QuorumProposal {
2536                block_header,
2537                view_number: view,
2538                justify_qc: justify_qc.clone(),
2539                upgrade_certificate: None,
2540                proposal_certificate: None,
2541            };
2542
2543            let quorum_proposal_signature =
2544                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2545                    .expect("Failed to sign quorum proposal");
2546
2547            let proposal = Proposal {
2548                data: quorum_proposal.clone(),
2549                signature: quorum_proposal_signature,
2550                _pd: PhantomData::<SeqTypes>,
2551            };
2552
2553            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2554            leaf.fill_block_payload(
2555                payload,
2556                GENESIS_VID_NUM_STORAGE_NODES,
2557                TEST_VERSIONS.test.base,
2558            )
2559            .unwrap();
2560
2561            let mut inner = storage.inner.write().await;
2562
2563            tracing::debug!("inserting decided leaves");
2564            let file_path = decided_leaves_path
2565                .join(view.to_string())
2566                .with_extension("txt");
2567
2568            tracing::debug!("inserting decided leaves");
2569
2570            inner
2571                .replace(
2572                    &file_path,
2573                    |_| Ok(true),
2574                    |mut file| {
2575                        let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2576                        file.write_all(&bytes)?;
2577                        Ok(())
2578                    },
2579                )
2580                .expect("replace decided leaves");
2581
2582            let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2583
2584            tracing::debug!("inserting qc for {view}");
2585
2586            inner
2587                .replace(
2588                    &file_path,
2589                    |_| Ok(true),
2590                    |mut file| {
2591                        let proposal_bytes =
2592                            bincode::serialize(&proposal).context("serialize proposal")?;
2593
2594                        file.write_all(&proposal_bytes)?;
2595                        Ok(())
2596                    },
2597                )
2598                .unwrap();
2599
2600            drop(inner);
2601            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2602                .disperse(payload_bytes.clone())
2603                .unwrap();
2604
2605            let vid = VidDisperseShare0::<SeqTypes> {
2606                view_number: ViewNumber::new(i),
2607                payload_commitment: Default::default(),
2608                share: disperse.shares[0].clone(),
2609                common: disperse.common,
2610                recipient_key: pubkey,
2611            };
2612
2613            let (payload, metadata) =
2614                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2615                    .await
2616                    .unwrap();
2617
2618            let da = DaProposal::<SeqTypes> {
2619                encoded_transactions: payload.encode(),
2620                metadata,
2621                view_number: ViewNumber::new(i),
2622            };
2623
2624            let block_payload_signature =
2625                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2626
2627            let da_proposal = Proposal {
2628                data: da,
2629                signature: block_payload_signature,
2630                _pd: Default::default(),
2631            };
2632
2633            tracing::debug!("inserting vid for {view}");
2634            storage
2635                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2636                .await
2637                .unwrap();
2638
2639            tracing::debug!("inserting da for {view}");
2640            storage
2641                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2642                .await
2643                .unwrap();
2644        }
2645
2646        storage.migrate_storage().await.unwrap();
2647        let inner = storage.inner.read().await;
2648        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2649        let decided_leaves_count = decided_leaves
2650            .filter_map(Result::ok)
2651            .filter(|e| e.path().is_file())
2652            .count();
2653        assert_eq!(
2654            decided_leaves_count, rows as usize,
2655            "decided leaves count does not match",
2656        );
2657
2658        let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2659        let da_proposals_count = da_proposals
2660            .filter_map(Result::ok)
2661            .filter(|e| e.path().is_file())
2662            .count();
2663        assert_eq!(
2664            da_proposals_count, rows as usize,
2665            "da proposals does not match",
2666        );
2667
2668        let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2669        let vids_count = vids
2670            .filter_map(Result::ok)
2671            .filter(|e| e.path().is_file())
2672            .count();
2673        assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2674
2675        let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2676        let qps_count = qps
2677            .filter_map(Result::ok)
2678            .filter(|e| e.path().is_file())
2679            .count();
2680        assert_eq!(
2681            qps_count, rows as usize,
2682            "quorum proposals count does not match",
2683        );
2684
2685        let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2686        let state_cert_count = state_certs
2687            .filter_map(Result::ok)
2688            .filter(|e| e.path().is_file())
2689            .count();
2690        assert_eq!(
2691            state_cert_count, rows as usize,
2692            "light client state update certificate count does not match",
2693        );
2694
2695        // Reinitialize the file system persistence using the same path.
2696        // re run the consensus migration.
2697        // No changes will occur, as the migration has already been completed.
2698        let storage = opt.create().await.unwrap();
2699        storage.migrate_storage().await.unwrap();
2700
2701        let inner = storage.inner.read().await;
2702        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2703        let decided_leaves_count = decided_leaves
2704            .filter_map(Result::ok)
2705            .filter(|e| e.path().is_file())
2706            .count();
2707        assert_eq!(
2708            decided_leaves_count, rows as usize,
2709            "decided leaves count does not match",
2710        );
2711    }
2712
2713    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2714    async fn test_load_quorum_proposals_invalid_extension() {
2715        let tmp = Persistence::tmp_storage().await;
2716        let storage = Persistence::connect(&tmp).await;
2717
2718        // Generate a couple of valid quorum proposals.
2719        let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2720        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2721        let signature = PubKey::sign(&privkey, &[]).unwrap();
2722        let mut quorum_proposal = Proposal {
2723            data: QuorumProposalWrapper::<SeqTypes> {
2724                proposal: QuorumProposal2::<SeqTypes> {
2725                    epoch: None,
2726                    block_header: leaf.block_header().clone(),
2727                    view_number: ViewNumber::genesis(),
2728                    justify_qc: QuorumCertificate2::genesis(
2729                        &Default::default(),
2730                        &NodeState::mock(),
2731                        TEST_VERSIONS.test,
2732                    )
2733                    .await,
2734                    upgrade_certificate: None,
2735                    view_change_evidence: None,
2736                    next_drb_result: None,
2737                    next_epoch_justify_qc: None,
2738                    state_cert: None,
2739                },
2740            },
2741            signature,
2742            _pd: Default::default(),
2743        };
2744
2745        // Store quorum proposals.
2746        let quorum_proposal1 = quorum_proposal.clone();
2747        storage
2748            .append_quorum_proposal2(&quorum_proposal1)
2749            .await
2750            .unwrap();
2751        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2752        let quorum_proposal2 = quorum_proposal.clone();
2753        storage
2754            .append_quorum_proposal2(&quorum_proposal2)
2755            .await
2756            .unwrap();
2757
2758        // Change one of the file extensions. It can happen that we end up with files with the wrong
2759        // extension if, for example, the node is killed before cleaning up a swap file.
2760        fs::rename(
2761            tmp.path().join("quorum_proposals2/1.txt"),
2762            tmp.path().join("quorum_proposals2/1.swp"),
2763        )
2764        .unwrap();
2765
2766        // Loading should simply ignore the unrecognized extension.
2767        assert_eq!(
2768            storage.load_quorum_proposals().await.unwrap(),
2769            [(ViewNumber::genesis(), quorum_proposal1)]
2770                .into_iter()
2771                .collect::<BTreeMap<_, _>>()
2772        );
2773    }
2774
2775    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2776    async fn test_load_quorum_proposals_malformed_data() {
2777        let tmp = Persistence::tmp_storage().await;
2778        let storage = Persistence::connect(&tmp).await;
2779
2780        // Generate a valid quorum proposal.
2781        let leaf: Leaf2 = Leaf::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base)
2782            .await
2783            .into();
2784        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2785        let signature = PubKey::sign(&privkey, &[]).unwrap();
2786        let quorum_proposal = Proposal {
2787            data: QuorumProposalWrapper::<SeqTypes> {
2788                proposal: QuorumProposal2::<SeqTypes> {
2789                    epoch: None,
2790                    block_header: leaf.block_header().clone(),
2791                    view_number: ViewNumber::new(1),
2792                    justify_qc: QuorumCertificate2::genesis(
2793                        &Default::default(),
2794                        &NodeState::mock(),
2795                        TEST_VERSIONS.test,
2796                    )
2797                    .await,
2798                    upgrade_certificate: None,
2799                    view_change_evidence: None,
2800                    next_drb_result: None,
2801                    next_epoch_justify_qc: None,
2802                    state_cert: None,
2803                },
2804            },
2805            signature,
2806            _pd: Default::default(),
2807        };
2808
2809        // First store an invalid quorum proposal.
2810        fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2811        fs::write(
2812            tmp.path().join("quorum_proposals2/0.txt"),
2813            "invalid data".as_bytes(),
2814        )
2815        .unwrap();
2816
2817        // Store valid quorum proposal.
2818        storage
2819            .append_quorum_proposal2(&quorum_proposal)
2820            .await
2821            .unwrap();
2822
2823        // Loading should ignore the invalid data and return the valid proposal.
2824        assert_eq!(
2825            storage.load_quorum_proposals().await.unwrap(),
2826            [(ViewNumber::new(1), quorum_proposal)]
2827                .into_iter()
2828                .collect::<BTreeMap<_, _>>()
2829        );
2830    }
2831
2832    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2833    async fn test_store_events_empty() {
2834        let tmp = Persistence::tmp_storage().await;
2835        let mut opt = Persistence::options(&tmp);
2836        let storage = opt.create().await.unwrap();
2837
2838        assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
2839
2840        // Storing an empty events list still updates the latest L1 block.
2841        for i in 1..=2 {
2842            tracing::info!(i, "update l1 height");
2843            storage.store_events(i, vec![]).await.unwrap();
2844            assert_eq!(
2845                storage.load_events(0, 100).await.unwrap(),
2846                (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
2847            );
2848        }
2849    }
2850
2851    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2852    async fn test_migrate_x25519_keys() {
2853        use std::collections::HashMap;
2854
2855        use alloy::primitives::{Address, U256};
2856        use indexmap::IndexMap;
2857
2858        use crate::persistence::RegisteredValidatorNoX25519;
2859
2860        let tmp = Persistence::tmp_storage().await;
2861        let mut opt = Persistence::options(&tmp);
2862        let storage = opt.create().await.unwrap();
2863
2864        let addr = Address::random();
2865        let legacy_validator = RegisteredValidatorNoX25519 {
2866            account: addr,
2867            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2868            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2869            stake: U256::from(1000),
2870            commission: 100,
2871            delegators: HashMap::new(),
2872            authenticated: true,
2873        };
2874
2875        // Serialize bincode stake table in legacy format (no x25519 fields)
2876        let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2877        legacy_map.insert(addr, legacy_validator);
2878
2879        type LegacyTuple = (
2880            IndexMap<Address, RegisteredValidatorNoX25519>,
2881            Option<RewardAmount>,
2882            Option<StakeTableHash>,
2883        );
2884        let legacy_data: LegacyTuple = (legacy_map, None, None);
2885        let bytes = bincode::serialize(&legacy_data).unwrap();
2886
2887        let inner = storage.inner.read().await;
2888        let path = inner.stake_table_dir_path();
2889        drop(inner);
2890        fs::create_dir_all(&path).unwrap();
2891        fs::write(path.join("1.txt"), &bytes).unwrap();
2892
2893        // Also create a JSON validators file without x25519 fields
2894        let json_validator = RegisteredValidatorNoX25519 {
2895            account: addr,
2896            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2897            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2898            stake: U256::from(2000),
2899            commission: 200,
2900            delegators: HashMap::new(),
2901            authenticated: true,
2902        };
2903        let mut json_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2904        json_map.insert(addr, json_validator);
2905        let validators_dir = path.join("validators");
2906        fs::create_dir_all(&validators_dir).unwrap();
2907        let json_path = validators_dir.join("epoch_1.json");
2908        fs::write(&json_path, serde_json::to_string_pretty(&json_map).unwrap()).unwrap();
2909
2910        // Run migration
2911        storage.migrate_x25519_keys().await.unwrap();
2912
2913        // Bincode file should now be loadable as current format
2914        let result = storage.load_stake(EpochNumber::new(1)).await.unwrap();
2915        assert!(result.is_some());
2916        let (validators, reward, hash) = result.unwrap();
2917        assert_eq!(validators.len(), 1);
2918        let v = validators.get(&addr).unwrap();
2919        assert_eq!(v.stake, U256::from(1000));
2920        assert!(v.x25519_key.is_none());
2921        assert!(v.p2p_addr.is_none());
2922        assert!(reward.is_none());
2923        assert!(hash.is_none());
2924
2925        // JSON file should now be loadable as RegisteredValidatorMap
2926        let json_content = fs::read_to_string(&json_path).unwrap();
2927        let parsed: espresso_types::RegisteredValidatorMap =
2928            serde_json::from_str(&json_content).unwrap();
2929        assert_eq!(parsed.len(), 1);
2930        let json_v = parsed.get(&addr).unwrap();
2931        assert_eq!(json_v.stake, U256::from(2000));
2932        assert!(json_v.x25519_key.is_none());
2933
2934        // Idempotent: running again is a no-op
2935        storage.migrate_x25519_keys().await.unwrap();
2936        let result2 = storage.load_stake(EpochNumber::new(1)).await;
2937        assert!(result2.is_ok());
2938    }
2939
2940    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2941    async fn test_migrate_x25519_keys_no_stake_dir() {
2942        let tmp = Persistence::tmp_storage().await;
2943        let mut opt = Persistence::options(&tmp);
2944        let storage = opt.create().await.unwrap();
2945
2946        // No stake_table dir exists. Migration should succeed.
2947        storage.migrate_x25519_keys().await.unwrap();
2948
2949        // Create stake_table dir with garbage data.
2950        let inner = storage.inner.read().await;
2951        let path = inner.stake_table_dir_path();
2952        drop(inner);
2953        fs::create_dir_all(&path).unwrap();
2954        fs::write(path.join("1.txt"), b"garbage").unwrap();
2955
2956        // If migration was properly marked done, this is a no-op.
2957        storage.migrate_x25519_keys().await.unwrap();
2958    }
2959
2960    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2961    async fn test_store_all_validators_authenticated_and_unauthenticated() {
2962        use std::collections::HashMap;
2963
2964        use alloy::primitives::{Address, U256};
2965        use espresso_types::v0_3::RegisteredValidator;
2966        use indexmap::IndexMap;
2967
2968        let tmp = Persistence::tmp_storage().await;
2969        let mut opt = Persistence::options(&tmp);
2970        let storage = opt.create().await.unwrap();
2971
2972        // Create an authenticated validator
2973        let authenticated_validator = RegisteredValidator {
2974            account: Address::random(),
2975            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2976            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2977            stake: U256::from(1000),
2978            commission: 100,
2979            delegators: HashMap::new(),
2980            authenticated: true,
2981            x25519_key: None,
2982            p2p_addr: None,
2983        };
2984
2985        // Create an unauthenticated validator
2986        let unauthenticated_validator = RegisteredValidator {
2987            account: Address::random(),
2988            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
2989            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2990            stake: U256::from(2000),
2991            commission: 200,
2992            delegators: HashMap::new(),
2993            authenticated: false,
2994            x25519_key: None,
2995            p2p_addr: None,
2996        };
2997
2998        let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
2999        validators.insert(
3000            authenticated_validator.account,
3001            authenticated_validator.clone(),
3002        );
3003        validators.insert(
3004            unauthenticated_validator.account,
3005            unauthenticated_validator.clone(),
3006        );
3007
3008        // Store both validators
3009        storage
3010            .store_all_validators(EpochNumber::new(1), validators)
3011            .await
3012            .unwrap();
3013
3014        // Load and verify
3015        let loaded = storage
3016            .load_all_validators(EpochNumber::new(1), 0, 100)
3017            .await
3018            .unwrap();
3019        assert_eq!(loaded.len(), 2);
3020
3021        // Find each validator and verify authenticated state is preserved
3022        let loaded_auth = loaded
3023            .iter()
3024            .find(|v| v.account == authenticated_validator.account)
3025            .unwrap();
3026        assert!(
3027            loaded_auth.authenticated,
3028            "authenticated validator should remain authenticated"
3029        );
3030
3031        let loaded_unauth = loaded
3032            .iter()
3033            .find(|v| v.account == unauthenticated_validator.account)
3034            .unwrap();
3035        assert!(
3036            !loaded_unauth.authenticated,
3037            "unauthenticated validator should remain unauthenticated"
3038        );
3039    }
3040}