Skip to main content

espresso_node/persistence/
fs.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    fs::{self, File, OpenOptions},
4    io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5    ops::RangeInclusive,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Instant,
9};
10
11use alloy::primitives::Address;
12use anyhow::{Context, anyhow, bail};
13use async_lock::RwLock;
14use async_trait::async_trait;
15use clap::Parser;
16use espresso_types::{
17    AuthenticatedValidatorMap, Leaf, Leaf2, NetworkConfig, Payload, PubKey, RegisteredValidatorMap,
18    SeqTypes, StakeTableHash,
19    traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
20    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
21    v0_3::{
22        AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
23        StakeTableEvent,
24    },
25};
26use hotshot::InitializerEpochInfo;
27use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
28    DhtPersistentStorage, SerializableRecord,
29};
30use hotshot_new_protocol::message::Certificate2;
31use hotshot_types::{
32    data::{
33        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
34        QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
35    },
36    drb::{DrbInput, DrbResult},
37    event::{Event, EventType, HotShotAction, LeafInfo},
38    message::{Proposal, convert_proposal},
39    new_protocol::CoordinatorEvent,
40    simple_certificate::{
41        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
42        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
43    },
44    traits::{
45        block_contents::{BlockHeader, BlockPayload},
46        metrics::Metrics,
47        node_implementation::NodeType,
48    },
49    vote::HasViewNumber,
50};
51use itertools::Itertools;
52
53use super::RegisteredValidatorNoX25519;
54use crate::{
55    RECENT_STAKE_TABLES_LIMIT, ViewNumber,
56    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
57};
58
59/// Deserialize a stake table from bytes, trying current and legacy formats.
60/// Returns (stake_tuple, needs_rewrite) where needs_rewrite=true means legacy format was used.
61fn deserialize_stake_table(bytes: &[u8]) -> anyhow::Result<(StakeTuple, bool)> {
62    // Try current format (RegisteredValidator with x25519 fields).
63    if let Ok(stake) = bincode::deserialize::<StakeTuple>(bytes) {
64        return Ok((stake, false));
65    }
66
67    // Legacy: RegisteredValidator without x25519_key/p2p_addr.
68    type LegacyMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
69    type LegacyTuple = (LegacyMap, Option<RewardAmount>, Option<StakeTableHash>);
70    let legacy: LegacyTuple = bincode::deserialize(bytes)
71        .context("failed to deserialize stake table (tried current and legacy formats)")?;
72    let migrated: AuthenticatedValidatorMap = legacy
73        .0
74        .into_iter()
75        .map(|(addr, v)| {
76            let registered = v.migrate();
77            (
78                addr,
79                AuthenticatedValidator::try_from(registered)
80                    .expect("stake tables only contain authenticated validators"),
81            )
82        })
83        .collect();
84    Ok(((migrated, legacy.1, legacy.2), true))
85}
86
87/// Options for file system backed persistence.
88#[derive(Parser, Clone, Debug)]
89pub struct Options {
90    /// Storage path for persistent data.
91    #[clap(long, env = "ESPRESSO_NODE_STORAGE_PATH")]
92    pub(crate) path: PathBuf,
93
94    /// Number of views to retain in consensus storage before data that hasn't been archived is
95    /// garbage collected.
96    ///
97    /// The longer this is, the more certain that all data will eventually be archived, even if
98    /// there are temporary problems with archive storage or partially missing data. This can be set
99    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
100    /// setting only applies to views which never get decided (ie forks in consensus) and views for
101    /// which this node is partially offline. These should be exceptionally rare.
102    ///
103    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
104    /// view time of 2s.
105    #[clap(
106        long,
107        env = "ESPRESSO_NODE_CONSENSUS_VIEW_RETENTION",
108        default_value = "130000"
109    )]
110    pub(crate) consensus_view_retention: u64,
111}
112
113impl Default for Options {
114    fn default() -> Self {
115        Self::parse_from(std::iter::empty::<String>())
116    }
117}
118
119impl Options {
120    pub fn new(path: PathBuf) -> Self {
121        Self {
122            path,
123            consensus_view_retention: 130000,
124        }
125    }
126
127    pub(crate) fn path(&self) -> &Path {
128        &self.path
129    }
130}
131
132#[async_trait]
133impl PersistenceOptions for Options {
134    type Persistence = Persistence;
135
136    fn set_view_retention(&mut self, view_retention: u64) {
137        self.consensus_view_retention = view_retention;
138    }
139
140    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
141        let path = self.path.clone();
142        let view_retention = self.consensus_view_retention;
143
144        let migration_path = path.join("migration");
145        let migrated = if migration_path.is_file() {
146            let bytes = fs::read(&migration_path).context(format!(
147                "unable to read migration from {}",
148                migration_path.display()
149            ))?;
150            bincode::deserialize(&bytes).context("malformed migration file")?
151        } else {
152            HashSet::new()
153        };
154
155        Ok(Persistence {
156            inner: Arc::new(RwLock::new(Inner {
157                path,
158                migrated,
159                view_retention,
160            })),
161            metrics: Arc::new(PersistenceMetricsValue::default()),
162        })
163    }
164
165    async fn reset(self) -> anyhow::Result<()> {
166        todo!()
167    }
168}
169
170/// File system backed persistence.
171#[derive(Clone, Debug)]
172pub struct Persistence {
173    // We enforce mutual exclusion on access to the data source, as the current file system
174    // implementation does not support transaction isolation for concurrent reads and writes. We can
175    // improve this in the future by switching to a SQLite-based file system implementation.
176    inner: Arc<RwLock<Inner>>,
177    /// A reference to the metrics trait
178    metrics: Arc<PersistenceMetricsValue>,
179}
180
181#[derive(Debug)]
182struct Inner {
183    path: PathBuf,
184    view_retention: u64,
185    migrated: HashSet<String>,
186}
187
188impl Inner {
189    fn config_path(&self) -> PathBuf {
190        self.path.join("hotshot.cfg")
191    }
192
193    fn migration(&self) -> PathBuf {
194        self.path.join("migration")
195    }
196
197    fn voted_view_path(&self) -> PathBuf {
198        self.path.join("highest_voted_view")
199    }
200
201    fn restart_view_path(&self) -> PathBuf {
202        self.path.join("restart_view")
203    }
204
205    /// Path to a directory containing decided leaves.
206    fn decided_leaf_path(&self) -> PathBuf {
207        self.path.join("decided_leaves")
208    }
209
210    fn decided_leaf2_path(&self) -> PathBuf {
211        self.path.join("decided_leaves2")
212    }
213
214    /// The path from previous versions where there was only a single file for anchor leaves.
215    fn legacy_anchor_leaf_path(&self) -> PathBuf {
216        self.path.join("anchor_leaf")
217    }
218
219    fn vid_dir_path(&self) -> PathBuf {
220        self.path.join("vid")
221    }
222
223    fn vid2_dir_path(&self) -> PathBuf {
224        self.path.join("vid2")
225    }
226
227    fn da_dir_path(&self) -> PathBuf {
228        self.path.join("da")
229    }
230
231    fn drb_dir_path(&self) -> PathBuf {
232        self.path.join("drb")
233    }
234
235    fn da2_dir_path(&self) -> PathBuf {
236        self.path.join("da2")
237    }
238
239    fn quorum_proposals_dir_path(&self) -> PathBuf {
240        self.path.join("quorum_proposals")
241    }
242
243    fn quorum_proposals2_dir_path(&self) -> PathBuf {
244        self.path.join("quorum_proposals2")
245    }
246
247    fn upgrade_certificate_dir_path(&self) -> PathBuf {
248        self.path.join("upgrade_certificate")
249    }
250
251    fn stake_table_dir_path(&self) -> PathBuf {
252        self.path.join("stake_table")
253    }
254
255    fn next_epoch_qc(&self) -> PathBuf {
256        self.path.join("next_epoch_quorum_certificate")
257    }
258
259    fn eqc(&self) -> PathBuf {
260        self.path.join("eqc")
261    }
262
263    fn libp2p_dht_path(&self) -> PathBuf {
264        self.path.join("libp2p_dht")
265    }
266    fn epoch_drb_result_dir_path(&self) -> PathBuf {
267        self.path.join("epoch_drb_result")
268    }
269
270    fn epoch_root_block_header_dir_path(&self) -> PathBuf {
271        self.path.join("epoch_root_block_header")
272    }
273
274    fn finalized_state_cert_dir_path(&self) -> PathBuf {
275        self.path.join("finalized_state_cert")
276    }
277
278    fn state_cert_dir_path(&self) -> PathBuf {
279        self.path.join("state_cert")
280    }
281
282    fn decided_cert2_dir_path(&self) -> PathBuf {
283        self.path.join("decided_cert2")
284    }
285
286    /// cert2 is only persisted for the view that is directly finalized
287    /// (the newest leaf in a decided chain). Ancestor views finalized
288    /// indirectly have no cert2 file on disk
289    /// for those, this returns `Ok(None)`
290    fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
291        let file_path = self
292            .decided_cert2_dir_path()
293            .join(view.u64().to_string())
294            .with_extension("bin");
295        if !file_path.is_file() {
296            return Ok(None);
297        }
298        let bytes = fs::read(&file_path).context("read cert2")?;
299        Ok(Some(
300            bincode::deserialize(&bytes).context("deserialize cert2")?,
301        ))
302    }
303
304    fn update_migration(&mut self) -> anyhow::Result<()> {
305        let path = self.migration();
306        let bytes = bincode::serialize(&self.migrated)?;
307
308        self.replace(
309            &path,
310            |_| Ok(true),
311            |mut file| {
312                file.write_all(&bytes)?;
313                Ok(())
314            },
315        )
316    }
317
318    /// Overwrite a file if a condition is met.
319    ///
320    /// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
321    /// returns `true`, or if there was no existing file, then `write` is called to update the
322    /// contents of the file. `write` receives a truncated file open in write mode and sets the
323    /// contents of the file.
324    ///
325    /// The final replacement of the original file is atomic; that is, `path` will be modified only
326    /// if the entire update succeeds.
327    fn replace(
328        &mut self,
329        path: &Path,
330        pred: impl FnOnce(File) -> anyhow::Result<bool>,
331        write: impl FnOnce(File) -> anyhow::Result<()>,
332    ) -> anyhow::Result<()> {
333        if path.is_file() {
334            // If there is an existing file, check if it is suitable to replace. Note that this
335            // check is not atomic with respect to the subsequent write at the file system level,
336            // but this object is the only one which writes to this file, and we have a mutable
337            // reference, so this should be safe.
338            if !pred(File::open(path)?)? {
339                // If we are not overwriting the file, we are done and consider the whole operation
340                // successful.
341                return Ok(());
342            }
343        }
344
345        // Either there is no existing file or we have decided to overwrite the file. Write the new
346        // contents into a temporary file so we can update `path` atomically using `rename`.
347        let mut swap_path = path.to_owned();
348        swap_path.set_extension("swp");
349        let swap = OpenOptions::new()
350            .write(true)
351            .truncate(true)
352            .create(true)
353            .open(&swap_path)?;
354        write(swap)?;
355
356        // Now we can replace the original file.
357        fs::rename(swap_path, path)?;
358
359        Ok(())
360    }
361
362    fn collect_garbage(
363        &mut self,
364        decided_view: ViewNumber,
365        prune_intervals: &[RangeInclusive<ViewNumber>],
366    ) -> anyhow::Result<()> {
367        let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
368
369        self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
370        self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
371        self.prune_files(
372            self.quorum_proposals2_dir_path(),
373            prune_view,
374            None,
375            prune_intervals,
376        )?;
377        self.prune_files(
378            self.state_cert_dir_path(),
379            prune_view,
380            None,
381            prune_intervals,
382        )?;
383
384        // Save the most recent leaf as it will be our anchor point if the node restarts.
385        self.prune_files(
386            self.decided_leaf2_path(),
387            prune_view,
388            Some(decided_view),
389            prune_intervals,
390        )?;
391
392        Ok(())
393    }
394
395    fn prune_files(
396        &mut self,
397        dir_path: PathBuf,
398        prune_view: ViewNumber,
399        keep_decided_view: Option<ViewNumber>,
400        prune_intervals: &[RangeInclusive<ViewNumber>],
401    ) -> anyhow::Result<()> {
402        if !dir_path.is_dir() {
403            return Ok(());
404        }
405
406        for (file_view, path) in view_files(dir_path)? {
407            // If the view is the anchor view, keep it no matter what.
408            if let Some(decided_view) = keep_decided_view
409                && decided_view == file_view
410            {
411                continue;
412            }
413            // Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
414            // which we've already successfully processed, contain the view; in this case we simply
415            // don't need it anymore.
416            if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
417                fs::remove_file(&path)?;
418            }
419        }
420
421        Ok(())
422    }
423
424    fn parse_decided_leaf(
425        &self,
426        bytes: &[u8],
427    ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
428        // Old versions of the software did not store the next epoch QC. Without knowing which
429        // version this file was created with, we can simply try parsing both ways and then
430        // reconstruct a certificate pair with or without the next epoch QC.
431        match bincode::deserialize(bytes) {
432            Ok((leaf, cert)) => Ok((leaf, cert)),
433            Err(err) => {
434                tracing::info!(
435                    "error parsing decided leaf, maybe file was created without next epoch QC? \
436                     {err}"
437                );
438                let (leaf, qc) =
439                    bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
440                        .context("parsing decided leaf")?;
441                Ok((leaf, CertificatePair::non_epoch_change(qc)))
442            },
443        }
444    }
445
446    /// Generate events based on persisted decided leaves.
447    ///
448    /// Returns a list of closed intervals of views which can be safely deleted, as all leaves
449    /// within these view ranges have been processed by the event consumer.
450    async fn generate_decide_events(
451        &mut self,
452        view: ViewNumber,
453        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
454        consumer: &impl EventConsumer,
455    ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
456        // Generate a decide event for each leaf, to be processed by the event consumer. We make a
457        // separate event for each leaf because it is possible we have non-consecutive leaves in our
458        // storage, which would not be valid as a single decide with a single leaf chain.
459        let mut leaves = BTreeMap::new();
460        for (v, path) in view_files(self.decided_leaf2_path())? {
461            if v > view {
462                continue;
463            }
464
465            let bytes =
466                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
467            let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
468
469            // Include the VID share if available.
470            let vid_proposal = self.load_vid_share(v)?;
471            if vid_proposal.is_none() {
472                tracing::debug!(?v, "VID share not available at decide");
473            }
474            let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone());
475
476            // Move the state cert to the finalized dir if it exists.
477            let state_cert = self.store_finalized_state_cert(v)?;
478
479            // Fill in the full block payload using the DA proposals we had persisted.
480            if let Some(proposal) = self.load_da_proposal(v)? {
481                let payload = Payload::from_bytes(
482                    &proposal.data.encoded_transactions,
483                    &proposal.data.metadata,
484                );
485                leaf.fill_block_payload_unchecked(payload);
486            } else {
487                tracing::debug!(?v, "DA proposal not available at decide");
488            }
489
490            let info = LeafInfo {
491                leaf,
492                vid_share,
493                state_cert,
494                // Note: the following fields are not used in Decide event processing, and should be
495                // removed. For now, we just default them.
496                state: Default::default(),
497                delta: Default::default(),
498            };
499
500            leaves.insert(v, (info, cert));
501        }
502
503        // The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is
504        // one -- was always included in the _previous_ decide event...but not removed from the
505        // database, because we always persist the most recent anchor leaf.
506        if let Some((oldest_view, _)) = leaves.first_key_value() {
507            // The only exception is when the oldest leaf is the genesis leaf; then there was no
508            // previous decide event.
509            if *oldest_view > ViewNumber::genesis() {
510                leaves.pop_first();
511            }
512        }
513
514        let mut intervals = vec![];
515        let mut current_interval = None;
516        for (view, (leaf, cert)) in leaves {
517            let height = leaf.leaf.block_header().block_number();
518
519            let event = if leaf.leaf.block_header().version() >= versions::NEW_PROTOCOL_VERSION {
520                let cert2 = self.load_cert2(view)?;
521                // One event per view. cert2 is only stored for the
522                // directly finalized view
523                // ancestors get `cert2: None`,
524                // which is what update() expects for indirectly decided leaves.
525                CoordinatorEvent::NewDecide {
526                    leaf_infos: vec![leaf],
527                    cert1: cert.qc().clone(),
528                    cert2,
529                }
530            } else {
531                let deciding_qc = deciding_qc
532                    .as_ref()
533                    .filter(|qc| qc.view_number() == cert.view_number() + 1)
534                    .cloned();
535                CoordinatorEvent::LegacyEvent(Event {
536                    view_number: view,
537                    event: EventType::Decide {
538                        committing_qc: Arc::new(cert),
539                        deciding_qc,
540                        leaf_chain: Arc::new(vec![leaf]),
541                        block_size: None,
542                    },
543                })
544            };
545            consumer.handle_event(&event).await?;
546
547            if let Some((start, end, current_height)) = current_interval.as_mut() {
548                if height == *current_height + 1 {
549                    // If we have a chain of consecutive leaves, extend the current interval of
550                    // views which are safe to delete.
551                    *current_height += 1;
552                    *end = view;
553                } else {
554                    // Otherwise, end the current interval and start a new one.
555                    intervals.push(*start..=*end);
556                    current_interval = Some((view, view, height));
557                }
558            } else {
559                // Start a new interval.
560                current_interval = Some((view, view, height));
561            }
562        }
563        if let Some((start, end, _)) = current_interval {
564            intervals.push(start..=end);
565        }
566
567        Ok(intervals)
568    }
569
570    fn load_da_proposal(
571        &self,
572        view: ViewNumber,
573    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
574        let dir_path = self.da2_dir_path();
575
576        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
577
578        if !file_path.exists() {
579            return Ok(None);
580        }
581
582        let da_bytes = fs::read(file_path)?;
583
584        let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
585            bincode::deserialize(&da_bytes)?;
586        Ok(Some(da_proposal))
587    }
588
589    fn load_vid_share(
590        &self,
591        view: ViewNumber,
592    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
593        let dir_path = self.vid2_dir_path();
594
595        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
596
597        if !file_path.exists() {
598            return Ok(None);
599        }
600
601        let vid_share_bytes = fs::read(file_path)?;
602        let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
603            bincode::deserialize(&vid_share_bytes)?;
604        Ok(Some(vid_share))
605    }
606
607    fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
608        tracing::info!("Checking `Leaf2` to load the anchor leaf.");
609        if self.decided_leaf2_path().is_dir() {
610            let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
611
612            // Return the latest decided leaf.
613            for (_, path) in view_files(self.decided_leaf2_path())? {
614                let bytes =
615                    fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
616                let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
617                if let Some((anchor_leaf, _)) = &anchor {
618                    if leaf.view_number() > anchor_leaf.view_number() {
619                        anchor = Some((leaf, cert.qc().clone()));
620                    }
621                } else {
622                    anchor = Some((leaf, cert.qc().clone()));
623                }
624            }
625
626            return Ok(anchor);
627        }
628
629        tracing::warn!(
630            "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
631             This is very likely to fail."
632        );
633        if self.legacy_anchor_leaf_path().is_file() {
634            // We may have an old version of storage, where there is just a single file for the
635            // anchor leaf. Read it and return the contents.
636            let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
637
638            // The first 8 bytes just contain the height of the leaf. We can skip this.
639            file.seek(SeekFrom::Start(8)).context("seek")?;
640            let bytes = file
641                .bytes()
642                .collect::<Result<Vec<_>, _>>()
643                .context("read")?;
644            return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
645        }
646
647        Ok(None)
648    }
649
650    fn store_finalized_state_cert(
651        &mut self,
652        view: ViewNumber,
653    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
654        let dir_path = self.state_cert_dir_path();
655        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
656
657        if !file_path.exists() {
658            return Ok(None);
659        }
660
661        let bytes = fs::read(&file_path)?;
662
663        let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
664            bincode::deserialize(&bytes).or_else(|err_v2| {
665                tracing::info!(
666                    error = %err_v2,
667                    path = %file_path.display(),
668                    "Failed to deserialize state certificate, attempting with v1"
669                );
670
671                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
672                    .map(Into::into)
673                    .with_context(|| {
674                        format!(
675                            "Failed to deserialize with both v2 and v1 from file '{}'. error: \
676                             {err_v2}",
677                            file_path.display()
678                        )
679                    })
680            })?;
681
682        let epoch = state_cert.epoch.u64();
683        let finalized_dir_path = self.finalized_state_cert_dir_path();
684        fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
685
686        let finalized_file_path = finalized_dir_path
687            .join(epoch.to_string())
688            .with_extension("txt");
689
690        self.replace(
691            &finalized_file_path,
692            |_| Ok(true),
693            |mut file| {
694                file.write_all(&bytes)?;
695                Ok(())
696            },
697        )
698        .context(format!(
699            "finalizing light client state update certificate file for epoch {epoch:?}"
700        ))?;
701
702        Ok(Some(state_cert))
703    }
704}
705
706#[async_trait]
707impl SequencerPersistence for Persistence {
708    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
709        Ok(())
710    }
711
712    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
713        let inner = self.inner.read().await;
714        let path = inner.config_path();
715        if !path.is_file() {
716            tracing::info!("config not found at {}", path.display());
717            return Ok(None);
718        }
719        tracing::info!("loading config from {}", path.display());
720
721        let bytes =
722            fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
723        let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
724        let json = migrate_network_config(json).context("migration of network config failed")?;
725        let config = serde_json::from_value(json).context("malformed config file")?;
726        Ok(Some(config))
727    }
728
729    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
730        let inner = self.inner.write().await;
731        let path = inner.config_path();
732        tracing::info!("saving config to {}", path.display());
733        Ok(cfg.to_file(path.display().to_string())?)
734    }
735
736    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
737        let inner = self.inner.read().await;
738        let path = inner.voted_view_path();
739        if !path.is_file() {
740            return Ok(None);
741        }
742        let bytes = fs::read(inner.voted_view_path())?
743            .try_into()
744            .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
745        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
746    }
747
748    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
749        let inner = self.inner.read().await;
750        let path = inner.restart_view_path();
751        if !path.is_file() {
752            return Ok(None);
753        }
754        let bytes = fs::read(path)?
755            .try_into()
756            .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
757        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
758    }
759
760    async fn append_decided_leaves(
761        &self,
762        view: ViewNumber,
763        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
764        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
765        consumer: &impl EventConsumer,
766    ) -> anyhow::Result<()> {
767        let mut inner = self.inner.write().await;
768        let path = inner.decided_leaf2_path();
769
770        // Ensure the anchor leaf directory exists.
771        fs::create_dir_all(&path).context("creating anchor leaf directory")?;
772
773        // Earlier versions stored only a single decided leaf in a regular file. If our storage is
774        // still on this version, migrate to a directory structure storing (possibly) many leaves.
775        let legacy_path = inner.legacy_anchor_leaf_path();
776        if !path.is_dir() && legacy_path.is_file() {
777            tracing::info!("migrating to multi-leaf storage");
778
779            // Move the existing data into the new directory.
780            let (leaf, qc) = inner
781                .load_anchor_leaf()?
782                .context("anchor leaf file exists but unable to load contents")?;
783            let view = leaf.view_number().u64();
784            let bytes = bincode::serialize(&(leaf, qc))?;
785            let new_file = path.join(view.to_string()).with_extension("txt");
786            inner
787                .replace(
788                    &new_file,
789                    |_| Ok(true),
790                    |mut file| {
791                        file.write_all(&bytes)?;
792                        Ok(())
793                    },
794                )
795                .context(format!("writing anchor leaf file {view}"))?;
796
797            // Now we can remove the old file.
798            fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
799        }
800
801        for (info, cert) in leaf_chain {
802            let view = info.leaf.view_number().u64();
803            let file_path = path.join(view.to_string()).with_extension("txt");
804            inner.replace(
805                &file_path,
806                |_| {
807                    // Don't overwrite an existing leaf, but warn about it as this is likely not
808                    // intended behavior from HotShot.
809                    tracing::warn!(view, "duplicate decided leaf");
810                    Ok(false)
811                },
812                |mut file| {
813                    let bytes = bincode::serialize(&(&info.leaf, cert))?;
814                    file.write_all(&bytes)?;
815                    Ok(())
816                },
817            )?;
818        }
819
820        match inner
821            .generate_decide_events(view, deciding_qc, consumer)
822            .await
823        {
824            Err(err) => {
825                // Event processing failure is not an error, since by this point we have at least
826                // managed to persist the decided leaves successfully, and the event processing will
827                // just run again at the next decide.
828                tracing::warn!(?view, "event processing failed: {err:#}");
829            },
830            Ok(intervals) => {
831                if let Err(err) = inner.collect_garbage(view, &intervals) {
832                    // Similarly, garbage collection is not an error. We have done everything we
833                    // strictly needed to do, and GC will run again at the next decide. Log the
834                    // error but do not return it.
835                    tracing::warn!(?view, "GC failed: {err:#}");
836                }
837            },
838        }
839
840        Ok(())
841    }
842
843    async fn load_anchor_leaf(
844        &self,
845    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
846        self.inner.read().await.load_anchor_leaf()
847    }
848
849    async fn load_da_proposal(
850        &self,
851        view: ViewNumber,
852    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
853        self.inner.read().await.load_da_proposal(view)
854    }
855
856    async fn load_vid_share(
857        &self,
858        view: ViewNumber,
859    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
860        self.inner.read().await.load_vid_share(view)
861    }
862
863    async fn append_vid(
864        &self,
865        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
866    ) -> anyhow::Result<()> {
867        let mut inner = self.inner.write().await;
868        let view_number = proposal.data.view_number().u64();
869        let dir_path = inner.vid2_dir_path();
870
871        fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
872
873        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
874        inner.replace(
875            &file_path,
876            |_| {
877                // Don't overwrite an existing share, but warn about it as this is likely not intended
878                // behavior from HotShot.
879                tracing::warn!(view_number, "duplicate VID share");
880                Ok(false)
881            },
882            |mut file| {
883                let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
884                let now = Instant::now();
885                file.write_all(&proposal_bytes)?;
886                self.metrics
887                    .internal_append_vid_duration
888                    .add_point(now.elapsed().as_secs_f64());
889                Ok(())
890            },
891        )
892    }
893
894    async fn append_da(
895        &self,
896        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
897        _vid_commit: VidCommitment,
898    ) -> anyhow::Result<()> {
899        let mut inner = self.inner.write().await;
900        let view_number = proposal.data.view_number().u64();
901        let dir_path = inner.da_dir_path();
902
903        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
904
905        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
906        inner.replace(
907            &file_path,
908            |_| {
909                // Don't overwrite an existing proposal, but warn about it as this is likely not
910                // intended behavior from HotShot.
911                tracing::warn!(view_number, "duplicate DA proposal");
912                Ok(false)
913            },
914            |mut file| {
915                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
916                let now = Instant::now();
917                file.write_all(&proposal_bytes)?;
918                self.metrics
919                    .internal_append_da_duration
920                    .add_point(now.elapsed().as_secs_f64());
921                Ok(())
922            },
923        )
924    }
925    async fn record_action(
926        &self,
927        view: ViewNumber,
928        _epoch: Option<EpochNumber>,
929        action: HotShotAction,
930    ) -> anyhow::Result<()> {
931        // Todo Remove this after https://github.com/EspressoSystems/espresso-network/issues/1931
932        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
933            return Ok(());
934        }
935        let mut inner = self.inner.write().await;
936        let path = &inner.voted_view_path();
937        inner.replace(
938            path,
939            |mut file| {
940                let mut bytes = vec![];
941                file.read_to_end(&mut bytes)?;
942                let bytes = bytes
943                    .try_into()
944                    .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
945                let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
946
947                // Overwrite the file if the saved view is older than the new view.
948                Ok(saved_view < view)
949            },
950            |mut file| {
951                file.write_all(&view.u64().to_le_bytes())?;
952                Ok(())
953            },
954        )?;
955
956        if matches!(action, HotShotAction::Vote) {
957            let restart_view_path = &inner.restart_view_path();
958            let restart_view = view + 1;
959            inner.replace(
960                restart_view_path,
961                |mut file| {
962                    let mut bytes = vec![];
963                    file.read_to_end(&mut bytes)?;
964                    let bytes = bytes
965                        .try_into()
966                        .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
967                    let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
968
969                    // Overwrite the file if the saved view is older than the new view.
970                    Ok(saved_view < restart_view)
971                },
972                |mut file| {
973                    file.write_all(&restart_view.u64().to_le_bytes())?;
974                    Ok(())
975                },
976            )?;
977        }
978        Ok(())
979    }
980
981    async fn append_quorum_proposal2(
982        &self,
983        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
984    ) -> anyhow::Result<()> {
985        let mut inner = self.inner.write().await;
986        let view_number = proposal.data.view_number().u64();
987        let dir_path = inner.quorum_proposals2_dir_path();
988
989        fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
990
991        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
992        inner.replace(
993            &file_path,
994            |_| {
995                // Always overwrite the previous file
996                Ok(true)
997            },
998            |mut file| {
999                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1000                let now = Instant::now();
1001                file.write_all(&proposal_bytes)?;
1002                self.metrics
1003                    .internal_append_quorum2_duration
1004                    .add_point(now.elapsed().as_secs_f64());
1005                Ok(())
1006            },
1007        )
1008    }
1009
1010    async fn append_cert2(
1011        &self,
1012        view: ViewNumber,
1013        cert2: Certificate2<SeqTypes>,
1014    ) -> anyhow::Result<()> {
1015        let mut inner = self.inner.write().await;
1016        let dir_path = inner.decided_cert2_dir_path();
1017        fs::create_dir_all(dir_path.clone()).context("failed to create decided_cert2 dir")?;
1018        let file_path = dir_path.join(view.u64().to_string()).with_extension("bin");
1019        inner.replace(
1020            &file_path,
1021            |_| Ok(true),
1022            |mut file| {
1023                let bytes = bincode::serialize(&cert2).context("serialize cert2")?;
1024                file.write_all(&bytes)?;
1025                Ok(())
1026            },
1027        )
1028    }
1029
1030    async fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
1031        let inner = self.inner.read().await;
1032        let dir_path = inner.decided_cert2_dir_path();
1033        let file_path = dir_path.join(view.u64().to_string()).with_extension("bin");
1034        if !file_path.is_file() {
1035            return Ok(None);
1036        }
1037        let bytes = fs::read(&file_path).context("read cert2")?;
1038        Ok(Some(
1039            bincode::deserialize(&bytes).context("deserialize cert2")?,
1040        ))
1041    }
1042    async fn load_quorum_proposals(
1043        &self,
1044    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1045    {
1046        let inner = self.inner.read().await;
1047
1048        // First, get the proposal directory.
1049        let dir_path = inner.quorum_proposals2_dir_path();
1050        if !dir_path.is_dir() {
1051            return Ok(Default::default());
1052        }
1053
1054        // Read quorum proposals from every data file in this directory.
1055        let mut map = BTreeMap::new();
1056        for (view, path) in view_files(&dir_path)? {
1057            let proposal_bytes = fs::read(path)?;
1058            let Some(proposal) = bincode::deserialize::<
1059                Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1060            >(&proposal_bytes)
1061            .or_else(|error| {
1062                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1063                    &proposal_bytes,
1064                )
1065                .map(convert_proposal)
1066                .inspect_err(|err_v3| {
1067                    // At this point, if the file contents are invalid, it is most likely an
1068                    // error rather than a miscellaneous file somehow ending up in the
1069                    // directory. However, we continue on, because it is better to collect as
1070                    // many proposals as we can rather than letting one bad proposal cause the
1071                    // entire operation to fail, and it is still possible that this was just
1072                    // some unintended file whose name happened to match the naming convention.
1073
1074                    tracing::warn!(
1075                        ?view,
1076                        %error,
1077                        error_v3 = %err_v3,
1078                        "ignoring malformed quorum proposal file"
1079                    );
1080                })
1081            })
1082            .ok() else {
1083                continue;
1084            };
1085
1086            let proposal2 = convert_proposal(proposal);
1087
1088            // Push to the map and we're done.
1089            map.insert(view, proposal2);
1090        }
1091
1092        Ok(map)
1093    }
1094
1095    async fn load_quorum_proposal(
1096        &self,
1097        view: ViewNumber,
1098    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1099        let inner = self.inner.read().await;
1100        let dir_path = inner.quorum_proposals2_dir_path();
1101        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1102        let bytes = fs::read(file_path)?;
1103        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1104            bincode::deserialize(&bytes).or_else(|error| {
1105                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1106                    &bytes,
1107                )
1108                .map(convert_proposal)
1109                .context(format!(
1110                    "Failed to deserialize quorum proposal for view {view:?}: {error}."
1111                ))
1112            })?;
1113        Ok(proposal)
1114    }
1115
1116    async fn load_upgrade_certificate(
1117        &self,
1118    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1119        let inner = self.inner.read().await;
1120        let path = inner.upgrade_certificate_dir_path();
1121        if !path.is_file() {
1122            return Ok(None);
1123        }
1124        let bytes = fs::read(&path).context("read")?;
1125        Ok(Some(
1126            bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1127        ))
1128    }
1129
1130    async fn store_upgrade_certificate(
1131        &self,
1132        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1133    ) -> anyhow::Result<()> {
1134        let mut inner = self.inner.write().await;
1135        let path = &inner.upgrade_certificate_dir_path();
1136        let certificate = match decided_upgrade_certificate {
1137            Some(cert) => cert,
1138            None => return Ok(()),
1139        };
1140        inner.replace(
1141            path,
1142            |_| {
1143                // Always overwrite the previous file.
1144                Ok(true)
1145            },
1146            |mut file| {
1147                let bytes =
1148                    bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1149                file.write_all(&bytes)?;
1150                Ok(())
1151            },
1152        )
1153    }
1154
1155    async fn store_next_epoch_quorum_certificate(
1156        &self,
1157        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1158    ) -> anyhow::Result<()> {
1159        let mut inner = self.inner.write().await;
1160        let path = &inner.next_epoch_qc();
1161
1162        inner.replace(
1163            path,
1164            |_| {
1165                // Always overwrite the previous file.
1166                Ok(true)
1167            },
1168            |mut file| {
1169                let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1170                file.write_all(&bytes)?;
1171                Ok(())
1172            },
1173        )
1174    }
1175
1176    async fn load_next_epoch_quorum_certificate(
1177        &self,
1178    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1179        let inner = self.inner.read().await;
1180        let path = inner.next_epoch_qc();
1181        if !path.is_file() {
1182            return Ok(None);
1183        }
1184        let bytes = fs::read(&path).context("read")?;
1185        Ok(Some(
1186            bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1187        ))
1188    }
1189
1190    async fn store_eqc(
1191        &self,
1192        high_qc: QuorumCertificate2<SeqTypes>,
1193        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1194    ) -> anyhow::Result<()> {
1195        let mut inner = self.inner.write().await;
1196        let path = &inner.eqc();
1197
1198        inner.replace(
1199            path,
1200            |_| {
1201                // Always overwrite the previous file.
1202                Ok(true)
1203            },
1204            |mut file| {
1205                let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1206                    .context("serializing next epoch qc")?;
1207                file.write_all(&bytes)?;
1208                Ok(())
1209            },
1210        )
1211    }
1212
1213    async fn load_eqc(
1214        &self,
1215    ) -> Option<(
1216        QuorumCertificate2<SeqTypes>,
1217        NextEpochQuorumCertificate2<SeqTypes>,
1218    )> {
1219        let inner = self.inner.read().await;
1220        let path = inner.eqc();
1221        if !path.is_file() {
1222            return None;
1223        }
1224        let bytes = fs::read(&path).ok()?;
1225
1226        bincode::deserialize(&bytes).ok()
1227    }
1228
1229    async fn append_da2(
1230        &self,
1231        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1232        _vid_commit: VidCommitment,
1233    ) -> anyhow::Result<()> {
1234        let mut inner = self.inner.write().await;
1235        let view_number = proposal.data.view_number().u64();
1236        let dir_path = inner.da2_dir_path();
1237
1238        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1239
1240        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1241        inner.replace(
1242            &file_path,
1243            |_| {
1244                // Don't overwrite an existing proposal, but warn about it as this is likely not
1245                // intended behavior from HotShot.
1246                tracing::warn!(view_number, "duplicate DA proposal");
1247                Ok(false)
1248            },
1249            |mut file| {
1250                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1251                let now = Instant::now();
1252                file.write_all(&proposal_bytes)?;
1253                self.metrics
1254                    .internal_append_da2_duration
1255                    .add_point(now.elapsed().as_secs_f64());
1256                Ok(())
1257            },
1258        )
1259    }
1260
1261    async fn append_proposal2(
1262        &self,
1263        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1264    ) -> anyhow::Result<()> {
1265        self.append_quorum_proposal2(proposal).await
1266    }
1267
1268    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1269        let mut inner = self.inner.write().await;
1270
1271        if inner.migrated.contains("anchor_leaf") {
1272            tracing::info!("decided leaves already migrated");
1273            return Ok(());
1274        }
1275
1276        let new_leaf_dir = inner.decided_leaf2_path();
1277
1278        fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2  dir")?;
1279
1280        let old_leaf_dir = inner.decided_leaf_path();
1281        if !old_leaf_dir.is_dir() {
1282            return Ok(());
1283        }
1284
1285        tracing::warn!("migrating decided leaves..");
1286        for entry in fs::read_dir(old_leaf_dir)? {
1287            let entry = entry?;
1288            let path = entry.path();
1289
1290            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1291                continue;
1292            };
1293            let Ok(view) = file.parse::<u64>() else {
1294                continue;
1295            };
1296
1297            let bytes =
1298                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1299            let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1300                .context(format!("parsing decided leaf {}", path.display()))?;
1301
1302            let leaf2: Leaf2 = leaf.into();
1303            let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1304
1305            let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1306
1307            inner.replace(
1308                &new_leaf_path,
1309                |_| {
1310                    tracing::warn!(view, "duplicate decided leaf");
1311                    Ok(false)
1312                },
1313                |mut file| {
1314                    let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1315                    file.write_all(&bytes)?;
1316                    Ok(())
1317                },
1318            )?;
1319
1320            if view % 100 == 0 {
1321                tracing::info!(view, "decided leaves migration progress");
1322            }
1323        }
1324
1325        inner.migrated.insert("anchor_leaf".to_string());
1326        inner.update_migration()?;
1327        tracing::warn!("successfully migrated decided leaves");
1328        Ok(())
1329    }
1330    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1331        let mut inner = self.inner.write().await;
1332
1333        if inner.migrated.contains("da_proposal") {
1334            tracing::info!("da proposals already migrated");
1335            return Ok(());
1336        }
1337
1338        let new_da_dir = inner.da2_dir_path();
1339
1340        fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1341
1342        let old_da_dir = inner.da_dir_path();
1343        if !old_da_dir.is_dir() {
1344            return Ok(());
1345        }
1346
1347        tracing::warn!("migrating da proposals..");
1348
1349        for entry in fs::read_dir(old_da_dir)? {
1350            let entry = entry?;
1351            let path = entry.path();
1352
1353            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1354                continue;
1355            };
1356            let Ok(view) = file.parse::<u64>() else {
1357                continue;
1358            };
1359
1360            let bytes =
1361                fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1362            let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1363                .context(format!("parsing da proposal {}", path.display()))?;
1364
1365            let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1366
1367            let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1368
1369            inner.replace(
1370                &new_da_path,
1371                |_| {
1372                    tracing::warn!(view, "duplicate DA proposal 2");
1373                    Ok(false)
1374                },
1375                |mut file| {
1376                    let bytes = bincode::serialize(&proposal2)?;
1377                    file.write_all(&bytes)?;
1378                    Ok(())
1379                },
1380            )?;
1381
1382            if view % 100 == 0 {
1383                tracing::info!(view, "DA proposals migration progress");
1384            }
1385        }
1386
1387        inner.migrated.insert("da_proposal".to_string());
1388        inner.update_migration()?;
1389        tracing::warn!("successfully migrated da proposals");
1390        Ok(())
1391    }
1392    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1393        let mut inner = self.inner.write().await;
1394
1395        if inner.migrated.contains("vid_share") {
1396            tracing::info!("vid shares already migrated");
1397            return Ok(());
1398        }
1399
1400        let new_vid_dir = inner.vid2_dir_path();
1401
1402        fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1403
1404        let old_vid_dir = inner.vid_dir_path();
1405        if !old_vid_dir.is_dir() {
1406            return Ok(());
1407        }
1408
1409        tracing::warn!("migrating vid shares..");
1410
1411        for entry in fs::read_dir(old_vid_dir)? {
1412            let entry = entry?;
1413            let path = entry.path();
1414
1415            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1416                continue;
1417            };
1418            let Ok(view) = file.parse::<u64>() else {
1419                continue;
1420            };
1421
1422            let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1423            let proposal =
1424                bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1425                    .context(format!("parsing vid share {}", path.display()))?;
1426
1427            let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1428
1429            let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1430                convert_proposal(proposal);
1431
1432            inner.replace(
1433                &new_vid_path,
1434                |_| {
1435                    tracing::warn!(view, "duplicate VID share ");
1436                    Ok(false)
1437                },
1438                |mut file| {
1439                    let bytes = bincode::serialize(&proposal2)?;
1440                    file.write_all(&bytes)?;
1441                    Ok(())
1442                },
1443            )?;
1444
1445            if view % 100 == 0 {
1446                tracing::info!(view, "VID shares migration progress");
1447            }
1448        }
1449
1450        inner.migrated.insert("vid_share".to_string());
1451        inner.update_migration()?;
1452        tracing::warn!("successfully migrated vid shares");
1453        Ok(())
1454    }
1455
1456    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1457        let mut inner = self.inner.write().await;
1458
1459        if inner.migrated.contains("quorum_proposals") {
1460            tracing::info!("quorum proposals already migrated");
1461            return Ok(());
1462        }
1463
1464        let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1465
1466        fs::create_dir_all(new_quorum_proposals_dir.clone())
1467            .context("failed to create quorum proposals 2 dir")?;
1468
1469        let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1470        if !old_quorum_proposals_dir.is_dir() {
1471            tracing::info!("no existing quorum proposals found for migration");
1472            return Ok(());
1473        }
1474
1475        tracing::warn!("migrating quorum proposals..");
1476        for entry in fs::read_dir(old_quorum_proposals_dir)? {
1477            let entry = entry?;
1478            let path = entry.path();
1479
1480            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1481                continue;
1482            };
1483            let Ok(view) = file.parse::<u64>() else {
1484                continue;
1485            };
1486
1487            let bytes =
1488                fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1489            let proposal =
1490                bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1491                    .context(format!("parsing quorum proposal {}", path.display()))?;
1492
1493            let new_file_path = new_quorum_proposals_dir
1494                .join(view.to_string())
1495                .with_extension("txt");
1496
1497            let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1498                convert_proposal(proposal);
1499
1500            inner.replace(
1501                &new_file_path,
1502                |_| {
1503                    tracing::warn!(view, "duplicate Quorum proposal2 ");
1504                    Ok(false)
1505                },
1506                |mut file| {
1507                    let bytes = bincode::serialize(&proposal2)?;
1508                    file.write_all(&bytes)?;
1509                    Ok(())
1510                },
1511            )?;
1512
1513            if view % 100 == 0 {
1514                tracing::info!(view, "Quorum proposals migration progress");
1515            }
1516        }
1517
1518        inner.migrated.insert("quorum_proposals".to_string());
1519        inner.update_migration()?;
1520        tracing::warn!("successfully migrated quorum proposals");
1521        Ok(())
1522    }
1523    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1524        Ok(())
1525    }
1526
1527    async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
1528        let mut inner = self.inner.write().await;
1529
1530        if inner.migrated.contains("x25519_keys") {
1531            tracing::info!("x25519_keys migration already complete");
1532            return Ok(());
1533        }
1534
1535        let path = inner.stake_table_dir_path();
1536        if !path.is_dir() {
1537            inner.migrated.insert("x25519_keys".to_string());
1538            inner.update_migration()?;
1539            return Ok(());
1540        }
1541
1542        tracing::warn!("migrating stake tables to add x25519 key fields...");
1543
1544        for (epoch, file_path) in epoch_files(&path)? {
1545            let bytes = fs::read(&file_path).with_context(|| {
1546                format!("failed to read stake table file at {}", file_path.display())
1547            })?;
1548
1549            let (stake, needs_rewrite) = deserialize_stake_table(&bytes).with_context(|| {
1550                format!(
1551                    "failed to deserialize stake table at {}",
1552                    file_path.display()
1553                )
1554            })?;
1555
1556            if !needs_rewrite {
1557                continue;
1558            }
1559
1560            // Atomic write: write to temp, then rename
1561            let new_bytes = bincode::serialize(&stake)?;
1562            let tmp_path = file_path.with_extension("txt.tmp");
1563            fs::write(&tmp_path, new_bytes)?;
1564            fs::rename(&tmp_path, &file_path)?;
1565
1566            tracing::info!(?epoch, "migrated stake table");
1567        }
1568
1569        // Also migrate validators JSON files (used by store_all_validators)
1570        let validators_dir = path.join("validators");
1571        if validators_dir.is_dir() {
1572            type LegacyJsonMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
1573
1574            for entry in fs::read_dir(&validators_dir)? {
1575                let entry = entry?;
1576                let file_path = entry.path();
1577                if file_path.extension().is_some_and(|ext| ext == "json") {
1578                    let content = fs::read_to_string(&file_path)?;
1579
1580                    // Try current format first
1581                    if serde_json::from_str::<RegisteredValidatorMap>(&content).is_ok() {
1582                        continue;
1583                    }
1584
1585                    // Migrate from legacy format (no x25519_key/p2p_addr)
1586                    let legacy: LegacyJsonMap =
1587                        serde_json::from_str(&content).with_context(|| {
1588                            format!(
1589                                "failed to deserialize validators at {} (tried both formats)",
1590                                file_path.display()
1591                            )
1592                        })?;
1593
1594                    let migrated: RegisteredValidatorMap = legacy
1595                        .into_iter()
1596                        .map(|(addr, v)| (addr, v.migrate()))
1597                        .collect();
1598
1599                    // Atomic write: write to temp, then rename
1600                    let new_json = serde_json::to_string_pretty(&migrated)?;
1601                    let tmp_path = file_path.with_extension("json.tmp");
1602                    fs::write(&tmp_path, new_json)?;
1603                    fs::rename(&tmp_path, &file_path)?;
1604
1605                    tracing::info!(?file_path, "migrated validators file");
1606                }
1607            }
1608        }
1609
1610        inner.migrated.insert("x25519_keys".to_string());
1611        inner.update_migration()?;
1612        tracing::warn!("x25519_keys migration complete");
1613        Ok(())
1614    }
1615
1616    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1617        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1618            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1619                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1620            } else if loaded_drb_input.iteration >= drb_input.iteration {
1621                anyhow::bail!(
1622                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1623                    loaded_drb_input,
1624                    drb_input
1625                )
1626            }
1627        }
1628
1629        let mut inner = self.inner.write().await;
1630        let dir_path = inner.drb_dir_path();
1631
1632        fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1633
1634        let drb_input_bytes =
1635            bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1636
1637        let file_path = dir_path
1638            .join(drb_input.epoch.to_string())
1639            .with_extension("bin");
1640
1641        inner.replace(
1642            &file_path,
1643            |_| {
1644                // Always overwrite the previous file.
1645                Ok(true)
1646            },
1647            |mut file| {
1648                file.write_all(&drb_input_bytes).context(format!(
1649                    "writing epoch drb_input file for epoch {:?} at {:?}",
1650                    drb_input.epoch, file_path
1651                ))
1652            },
1653        )
1654    }
1655
1656    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1657        let inner = self.inner.read().await;
1658        let path = &inner.drb_dir_path();
1659        let file_path = path.join(epoch.to_string()).with_extension("bin");
1660        let bytes = fs::read(&file_path).context("read")?;
1661        Ok(bincode::deserialize(&bytes)
1662            .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1663    }
1664
1665    async fn store_drb_result(
1666        &self,
1667        epoch: EpochNumber,
1668        drb_result: DrbResult,
1669    ) -> anyhow::Result<()> {
1670        let mut inner = self.inner.write().await;
1671        let dir_path = inner.epoch_drb_result_dir_path();
1672
1673        fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1674
1675        let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1676
1677        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1678
1679        inner.replace(
1680            &file_path,
1681            |_| {
1682                // Always overwrite the previous file.
1683                Ok(true)
1684            },
1685            |mut file| {
1686                file.write_all(&drb_result_bytes)
1687                    .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1688            },
1689        )
1690    }
1691
1692    async fn store_epoch_root(
1693        &self,
1694        epoch: EpochNumber,
1695        block_header: <SeqTypes as NodeType>::BlockHeader,
1696    ) -> anyhow::Result<()> {
1697        let mut inner = self.inner.write().await;
1698        let dir_path = inner.epoch_root_block_header_dir_path();
1699
1700        fs::create_dir_all(dir_path.clone())
1701            .context("failed to create epoch root block header dir")?;
1702
1703        let block_header_bytes =
1704            bincode::serialize(&block_header).context("serialize block header")?;
1705
1706        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1707        inner
1708            .replace(
1709                &file_path,
1710                |_| Ok(true),
1711                |mut file| {
1712                    file.write_all(&block_header_bytes)?;
1713                    Ok(())
1714                },
1715            )
1716            .context(format!(
1717                "writing epoch root block header file for epoch {epoch:?}"
1718            ))?;
1719
1720        Ok(())
1721    }
1722
1723    async fn add_state_cert(
1724        &self,
1725        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1726    ) -> anyhow::Result<()> {
1727        let mut inner = self.inner.write().await;
1728        // let epoch = state_cert.epoch;
1729        let view = state_cert.light_client_state.view_number;
1730        let dir_path = inner.state_cert_dir_path();
1731
1732        fs::create_dir_all(dir_path.clone())
1733            .context("failed to create light client state update certificate dir")?;
1734
1735        let bytes = bincode::serialize(&state_cert)
1736            .context("serialize light client state update certificate")?;
1737
1738        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1739        inner
1740            .replace(
1741                &file_path,
1742                |_| Ok(true),
1743                |mut file| {
1744                    file.write_all(&bytes)?;
1745                    Ok(())
1746                },
1747            )
1748            .context(format!(
1749                "writing light client state update certificate file for view {view:?}"
1750            ))?;
1751
1752        Ok(())
1753    }
1754
1755    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1756        let inner = self.inner.read().await;
1757        let drb_dir_path = inner.epoch_drb_result_dir_path();
1758        let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1759
1760        let mut result = Vec::new();
1761
1762        if !drb_dir_path.is_dir() {
1763            return Ok(Vec::new());
1764        }
1765        for (epoch, path) in epoch_files(drb_dir_path)? {
1766            let bytes =
1767                fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1768            let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1769                .context(format!("parsing epoch drb result {}", path.display()))?;
1770
1771            let block_header_path = block_header_dir_path
1772                .join(epoch.to_string())
1773                .with_extension("txt");
1774            let block_header = if block_header_path.is_file() {
1775                let bytes = fs::read(&block_header_path).context(format!(
1776                    "reading epoch root block header {}",
1777                    block_header_path.display()
1778                ))?;
1779                Some(
1780                    bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1781                        format!(
1782                            "parsing epoch root block header {}",
1783                            block_header_path.display()
1784                        ),
1785                    )?,
1786                )
1787            } else {
1788                None
1789            };
1790
1791            result.push(InitializerEpochInfo::<SeqTypes> {
1792                epoch,
1793                drb_result,
1794                block_header,
1795            });
1796        }
1797
1798        result.sort_by_key(|a| a.epoch);
1799
1800        // Keep only the most recent epochs
1801        let start = result
1802            .len()
1803            .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1804        let recent = result[start..].to_vec();
1805
1806        Ok(recent)
1807    }
1808
1809    async fn load_state_cert(
1810        &self,
1811    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1812        let inner = self.inner.read().await;
1813        let dir_path = inner.finalized_state_cert_dir_path();
1814
1815        if !dir_path.is_dir() {
1816            return Ok(None);
1817        }
1818
1819        let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1820
1821        for (epoch, path) in epoch_files(dir_path)? {
1822            if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1823                continue;
1824            }
1825            let bytes = fs::read(&path).context(format!(
1826                "reading light client state update certificate {}",
1827                path.display()
1828            ))?;
1829            let cert =
1830                bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1831                    .or_else(|error| {
1832                        tracing::info!(
1833                            %error,
1834                            path = %path.display(),
1835                            "Failed to deserialize LightClientStateUpdateCertificateV2"
1836                        );
1837
1838                        bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1839                            &bytes,
1840                        )
1841                        .map(Into::into)
1842                        .with_context(|| {
1843                            format!(
1844                                "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1845                                path.display()
1846                            )
1847                        })
1848                    })?;
1849
1850            result = Some(cert);
1851        }
1852
1853        Ok(result)
1854    }
1855
1856    async fn get_state_cert_by_epoch(
1857        &self,
1858        epoch: u64,
1859    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1860        let inner = self.inner.read().await;
1861        let dir_path = inner.finalized_state_cert_dir_path();
1862
1863        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1864
1865        if !file_path.exists() {
1866            return Ok(None);
1867        }
1868
1869        let bytes = fs::read(&file_path).context(format!(
1870            "reading light client state update certificate {}",
1871            file_path.display()
1872        ))?;
1873
1874        let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1875            .or_else(|error| {
1876                tracing::info!(
1877                    %error,
1878                    path = %file_path.display(),
1879                    "Failed to deserialize LightClientStateUpdateCertificateV2"
1880                );
1881
1882                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1883                    .map(Into::into)
1884                    .with_context(|| {
1885                        format!(
1886                            "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1887                            file_path.display()
1888                        )
1889                    })
1890            })?;
1891
1892        Ok(Some(cert))
1893    }
1894
1895    async fn insert_state_cert(
1896        &self,
1897        epoch: u64,
1898        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1899    ) -> anyhow::Result<()> {
1900        let inner = self.inner.read().await;
1901        let dir_path = inner.finalized_state_cert_dir_path();
1902
1903        fs::create_dir_all(&dir_path)
1904            .context(format!("creating state cert dir {}", dir_path.display()))?;
1905
1906        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1907        let bytes = bincode::serialize(&cert)
1908            .context("serializing light client state update certificate")?;
1909
1910        fs::write(&file_path, bytes).context(format!(
1911            "writing light client state update certificate {}",
1912            file_path.display()
1913        ))?;
1914
1915        Ok(())
1916    }
1917
1918    fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1919        // todo!()
1920    }
1921}
1922
1923#[async_trait]
1924impl MembershipPersistence for Persistence {
1925    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
1926        let inner = self.inner.read().await;
1927        let path = &inner.stake_table_dir_path();
1928        let file_path = path.join(epoch.to_string()).with_extension("txt");
1929
1930        if !file_path.exists() {
1931            return Ok(None);
1932        }
1933
1934        let bytes = fs::read(&file_path).with_context(|| {
1935            format!("failed to read stake table file at {}", file_path.display())
1936        })?;
1937
1938        let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1939            format!(
1940                "failed to deserialize stake table at {}",
1941                file_path.display()
1942            )
1943        })?;
1944        Ok(Some(stake))
1945    }
1946
1947    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1948        let limit = limit as usize;
1949        let inner = self.inner.read().await;
1950        let path = &inner.stake_table_dir_path();
1951        let sorted_files = epoch_files(path)?
1952            .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1953            .take(limit);
1954        let mut validator_sets: Vec<IndexedStake> = Vec::new();
1955
1956        for (epoch, file_path) in sorted_files {
1957            let bytes = fs::read(&file_path).with_context(|| {
1958                format!("failed to read stake table file at {}", file_path.display())
1959            })?;
1960
1961            let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1962                format!(
1963                    "failed to deserialize stake table at {}",
1964                    file_path.display()
1965                )
1966            })?;
1967            validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1968        }
1969
1970        Ok(Some(validator_sets))
1971    }
1972
1973    async fn store_stake(
1974        &self,
1975        epoch: EpochNumber,
1976        stake: AuthenticatedValidatorMap,
1977        block_reward: Option<RewardAmount>,
1978        stake_table_hash: Option<StakeTableHash>,
1979    ) -> anyhow::Result<()> {
1980        let mut inner = self.inner.write().await;
1981        let dir_path = &inner.stake_table_dir_path();
1982
1983        fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1984
1985        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1986
1987        inner.replace(
1988            &file_path,
1989            |_| {
1990                // Always overwrite the previous file.
1991                Ok(true)
1992            },
1993            |mut file| {
1994                let data: StakeTuple = (stake, block_reward, stake_table_hash);
1995                let bytes =
1996                    bincode::serialize(&data).context("serializing combined stake table")?;
1997                file.write_all(&bytes)?;
1998                Ok(())
1999            },
2000        )
2001    }
2002
2003    /// store stake table events upto the l1 block
2004    async fn store_events(
2005        &self,
2006        to_l1_block: u64,
2007        events: Vec<(EventKey, StakeTableEvent)>,
2008    ) -> anyhow::Result<()> {
2009        let mut inner = self.inner.write().await;
2010        let dir_path = &inner.stake_table_dir_path();
2011        let events_dir = dir_path.join("events");
2012
2013        fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
2014        // Read the last l1 finalized for which events has been stored
2015        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2016
2017        // check if the last l1 events is higher than the incoming one
2018        if last_l1_finalized_path.exists() {
2019            let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
2020                format!("Failed to read file at path: {last_l1_finalized_path:?}")
2021            })?;
2022            let mut buf = [0; 8];
2023            bytes
2024                .as_slice()
2025                .read_exact(&mut buf[..8])
2026                .with_context(|| {
2027                    format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2028                })?;
2029            let persisted_l1_block = u64::from_le_bytes(buf);
2030            if persisted_l1_block > to_l1_block {
2031                tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
2032                return Ok(());
2033            }
2034        }
2035
2036        // stores each event in a separate file
2037        // this can cause performance issue when, for example, reading all the files
2038        // However, the plan is to remove file system completely in future
2039        for (event_key, event) in events {
2040            let (block_number, event_index) = event_key;
2041            // file name is like block_index.json
2042            let filename = format!("{block_number}_{event_index}");
2043            let file_path = events_dir.join(filename).with_extension("json");
2044
2045            if file_path.exists() {
2046                continue;
2047            }
2048
2049            inner
2050                .replace(
2051                    &file_path,
2052                    |_| Ok(true),
2053                    |file| {
2054                        let writer = BufWriter::new(file);
2055
2056                        serde_json::to_writer_pretty(writer, &event)?;
2057                        Ok(())
2058                    },
2059                )
2060                .context("Failed to write event to file")?;
2061        }
2062
2063        // update the l1 block for which we have processed events
2064        inner.replace(
2065            &last_l1_finalized_path,
2066            |_| Ok(true),
2067            |mut file| {
2068                let bytes = to_l1_block.to_le_bytes();
2069
2070                file.write_all(&bytes)?;
2071                tracing::debug!("updated l1 finalized ={to_l1_block:?}");
2072                Ok(())
2073            },
2074        )
2075    }
2076
2077    /// Loads all events from persistent storage up to the specified L1 block.
2078    ///
2079    /// # Returns
2080    ///
2081    /// Returns a tuple containing:
2082    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
2083    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
2084    /// event key is (l1 block number, log index)
2085    ///   and the corresponding StakeTable event.
2086    ///
2087    async fn load_events(
2088        &self,
2089        from_l1_block: u64,
2090        to_l1_block: u64,
2091    ) -> anyhow::Result<(
2092        Option<EventsPersistenceRead>,
2093        Vec<(EventKey, StakeTableEvent)>,
2094    )> {
2095        let inner = self.inner.read().await;
2096        let dir_path = inner.stake_table_dir_path();
2097        let events_dir = dir_path.join("events");
2098
2099        // check if we have any events in storage
2100        // we can do this by checking last l1 finalized block for which we processed events
2101        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2102
2103        if !last_l1_finalized_path.exists() || !events_dir.exists() {
2104            return Ok((None, Vec::new()));
2105        }
2106
2107        let mut events = Vec::new();
2108
2109        let bytes = fs::read(&last_l1_finalized_path)
2110            .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
2111        let mut buf = [0; 8];
2112        bytes
2113            .as_slice()
2114            .read_exact(&mut buf[..8])
2115            .with_context(|| {
2116                format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2117            })?;
2118
2119        let last_processed_l1_block = u64::from_le_bytes(buf);
2120
2121        // Determine the L1 block for querying events.
2122        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
2123        // Otherwise, query up to the last stored block.
2124        let query_l1_block = if last_processed_l1_block > to_l1_block {
2125            to_l1_block
2126        } else {
2127            last_processed_l1_block
2128        };
2129
2130        for entry in fs::read_dir(&events_dir).context("events directory")? {
2131            let entry = entry?;
2132            let path = entry.path();
2133
2134            if !entry.file_type()?.is_file() {
2135                continue;
2136            }
2137
2138            if path
2139                .extension()
2140                .context(format!("extension for path={path:?}"))?
2141                != "json"
2142            {
2143                continue;
2144            }
2145
2146            let filename = path
2147                .file_stem()
2148                .and_then(|f| f.to_str())
2149                .unwrap_or_default();
2150
2151            let parts: Vec<&str> = filename.split('_').collect();
2152            if parts.len() != 2 {
2153                continue;
2154            }
2155
2156            let block_number = parts[0].parse::<u64>()?;
2157            let log_index = parts[1].parse::<u64>()?;
2158
2159            if block_number < from_l1_block || block_number > query_l1_block {
2160                continue;
2161            }
2162
2163            let file =
2164                File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2165            let reader = BufReader::new(file);
2166
2167            let event: StakeTableEvent = serde_json::from_reader(reader)
2168                .context(format!("Failed to deserialize event at path={path:?}"))?;
2169
2170            events.push(((block_number, log_index), event));
2171        }
2172
2173        events.sort_by_key(|(key, _)| *key);
2174
2175        if query_l1_block == to_l1_block {
2176            Ok((Some(EventsPersistenceRead::Complete), events))
2177        } else {
2178            Ok((
2179                Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2180                events,
2181            ))
2182        }
2183    }
2184
2185    async fn delete_stake_tables(&self) -> anyhow::Result<()> {
2186        let inner = self.inner.write().await;
2187        let events_dir = inner.stake_table_dir_path().join("events");
2188        if events_dir.exists() {
2189            fs::remove_dir_all(&events_dir)
2190                .with_context(|| format!("Failed to remove events dir: {events_dir:?}"))?;
2191        }
2192        let validators_dir = inner.stake_table_dir_path().join("validators");
2193        if validators_dir.exists() {
2194            fs::remove_dir_all(&validators_dir)
2195                .with_context(|| format!("Failed to remove validators dir: {validators_dir:?}"))?;
2196        }
2197        let drb_dir = inner.epoch_drb_result_dir_path();
2198        if drb_dir.exists() {
2199            fs::remove_dir_all(&drb_dir)
2200                .with_context(|| format!("Failed to remove epoch DRB result dir: {drb_dir:?}"))?;
2201        }
2202        Ok(())
2203    }
2204
2205    async fn store_all_validators(
2206        &self,
2207        epoch: EpochNumber,
2208        all_validators: RegisteredValidatorMap,
2209    ) -> anyhow::Result<()> {
2210        let mut inner = self.inner.write().await;
2211        let dir_path = inner.stake_table_dir_path();
2212        let validators_dir = dir_path.join("validators");
2213
2214        // Ensure validators directory exists
2215        fs::create_dir_all(&validators_dir)
2216            .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2217
2218        // Path = validators/epoch_<number>.json
2219        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2220
2221        inner
2222            .replace(
2223                &file_path,
2224                |_| Ok(true),
2225                |file| {
2226                    let writer = BufWriter::new(file);
2227
2228                    serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2229                        format!("Failed to serialize validators for epoch {epoch}")
2230                    })?;
2231                    Ok(())
2232                },
2233            )
2234            .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2235
2236        Ok(())
2237    }
2238
2239    async fn load_all_validators(
2240        &self,
2241        epoch: EpochNumber,
2242        offset: u64,
2243        limit: u64,
2244    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
2245        let inner = self.inner.read().await;
2246        let dir_path = inner.stake_table_dir_path();
2247        let validators_dir = dir_path.join("validators");
2248        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2249
2250        if !file_path.exists() {
2251            bail!("Validator file not found for epoch {epoch}");
2252        }
2253
2254        let file = File::open(&file_path)
2255            .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2256        let reader = BufReader::new(file);
2257
2258        let map: RegisteredValidatorMap = serde_json::from_reader(reader).with_context(|| {
2259            format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2260        })?;
2261
2262        let mut values: Vec<RegisteredValidator<PubKey>> = map.into_values().collect();
2263        values.sort_by_key(|v| v.account);
2264
2265        let start = offset as usize;
2266        let end = (start + limit as usize).min(values.len());
2267
2268        if start >= values.len() {
2269            return Ok(vec![]);
2270        }
2271
2272        Ok(values[start..end].to_vec())
2273    }
2274}
2275
2276#[async_trait]
2277impl DhtPersistentStorage for Persistence {
2278    /// Save the DHT to the file on disk
2279    ///
2280    /// # Errors
2281    /// - If we fail to serialize the records
2282    /// - If we fail to write the serialized records to the file
2283    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2284        // Bincode-serialize the records
2285        let to_save =
2286            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2287
2288        // Get the path to save the file to
2289        let path = self.inner.read().await.libp2p_dht_path();
2290
2291        // Create the directory if it doesn't exist
2292        fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2293            .with_context(|| "failed to create directory")?;
2294
2295        // Get a write lock on the inner struct
2296        let mut inner = self.inner.write().await;
2297
2298        // Save the file, replacing the previous one if it exists
2299        inner
2300            .replace(
2301                &path,
2302                |_| {
2303                    // Always overwrite the previous file
2304                    Ok(true)
2305                },
2306                |mut file| {
2307                    file.write_all(&to_save)
2308                        .with_context(|| "failed to write records to file")?;
2309                    Ok(())
2310                },
2311            )
2312            .with_context(|| "failed to save records to file")?;
2313
2314        Ok(())
2315    }
2316
2317    /// Load the DHT from the file on disk
2318    ///
2319    /// # Errors
2320    /// - If we fail to read the file
2321    /// - If we fail to deserialize the records
2322    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2323        // Read the contents of the file
2324        let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2325            .with_context(|| "Failed to read records from file")?;
2326
2327        // Deserialize the contents
2328        let records: Vec<SerializableRecord> =
2329            bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2330
2331        Ok(records)
2332    }
2333}
2334
2335/// Get all paths under `dir` whose name is of the form <view number>.txt.
2336fn view_files(
2337    dir: impl AsRef<Path>,
2338) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2339    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2340        let dir = dir.as_ref().display();
2341        let entry = entry.ok()?;
2342        if !entry.file_type().ok()?.is_file() {
2343            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2344            return None;
2345        }
2346        let path = entry.path();
2347        if path.extension()? != "txt" {
2348            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2349            return None;
2350        }
2351        let file_name = path.file_stem()?;
2352        let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2353            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2354            return None;
2355        };
2356        Some((ViewNumber::new(view_number), entry.path().to_owned()))
2357    }))
2358}
2359
2360/// Get all paths under `dir` whose name is of the form <epoch number>.txt.
2361/// Should probably be made generic and merged with view_files.
2362fn epoch_files(
2363    dir: impl AsRef<Path>,
2364) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2365    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2366        let dir = dir.as_ref().display();
2367        let entry = entry.ok()?;
2368        if !entry.file_type().ok()?.is_file() {
2369            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2370            return None;
2371        }
2372        let path = entry.path();
2373        if path.extension()? != "txt" {
2374            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2375            return None;
2376        }
2377        let file_name = path.file_stem()?;
2378        let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2379            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2380            return None;
2381        };
2382        Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2383    }))
2384}
2385
2386#[cfg(test)]
2387mod test {
2388    use std::marker::PhantomData;
2389
2390    use committable::{Commitment, CommitmentBoundsArkless, Committable};
2391    use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2392    use hotshot::types::SignatureKey;
2393    use hotshot_example_types::node_types::TEST_VERSIONS;
2394    use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
2395    use hotshot_types::{
2396        data::QuorumProposal2,
2397        light_client::LightClientState,
2398        simple_certificate::QuorumCertificate,
2399        simple_vote::{QuorumData, Vote2Data},
2400        traits::{EncodeBytes, block_contents::GENESIS_VID_NUM_STORAGE_NODES},
2401        vid::advz::advz_scheme,
2402    };
2403    use jf_advz::VidScheme;
2404    use serde_json::json;
2405    use tempfile::TempDir;
2406
2407    use super::*;
2408    use crate::{BLSPubKey, persistence::tests::TestablePersistence};
2409
2410    #[async_trait]
2411    impl TestablePersistence for Persistence {
2412        type Storage = TempDir;
2413
2414        async fn tmp_storage() -> Self::Storage {
2415            TempDir::new().unwrap()
2416        }
2417
2418        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2419            Options::new(storage.path().into())
2420        }
2421    }
2422
2423    #[test]
2424    fn test_config_migrations_add_builder_urls() {
2425        let before = json!({
2426            "config": {
2427                "builder_url": "https://test:8080",
2428                "start_proposing_view": 1,
2429                "stop_proposing_view": 2,
2430                "start_voting_view": 1,
2431                "stop_voting_view": 2,
2432                "start_proposing_time": 1,
2433                "stop_proposing_time": 2,
2434                "start_voting_time": 1,
2435                "stop_voting_time": 2
2436            }
2437        });
2438        let after = json!({
2439            "config": {
2440                "builder_urls": ["https://test:8080"],
2441                "start_proposing_view": 1,
2442                "stop_proposing_view": 2,
2443                "start_voting_view": 1,
2444                "stop_voting_view": 2,
2445                "start_proposing_time": 1,
2446                "stop_proposing_time": 2,
2447                "start_voting_time": 1,
2448                "stop_voting_time": 2,
2449                "epoch_height": 0,
2450                "drb_difficulty": 0,
2451                "drb_upgrade_difficulty": 0,
2452                "da_committees": [],
2453            }
2454        });
2455
2456        assert_eq!(migrate_network_config(before).unwrap(), after);
2457    }
2458
2459    #[test]
2460    fn test_config_migrations_existing_builder_urls() {
2461        let before = json!({
2462            "config": {
2463                "builder_urls": ["https://test:8080", "https://test:8081"],
2464                "start_proposing_view": 1,
2465                "stop_proposing_view": 2,
2466                "start_voting_view": 1,
2467                "stop_voting_view": 2,
2468                "start_proposing_time": 1,
2469                "stop_proposing_time": 2,
2470                "start_voting_time": 1,
2471                "stop_voting_time": 2,
2472                "epoch_height": 0,
2473                "drb_difficulty": 0,
2474                "drb_upgrade_difficulty": 0,
2475                "da_committees": [],
2476            }
2477        });
2478
2479        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2480    }
2481
2482    #[test]
2483    fn test_config_migrations_add_upgrade_params() {
2484        let before = json!({
2485            "config": {
2486                "builder_urls": ["https://test:8080", "https://test:8081"]
2487            }
2488        });
2489        let after = json!({
2490            "config": {
2491                "builder_urls": ["https://test:8080", "https://test:8081"],
2492                "start_proposing_view": 9007199254740991u64,
2493                "stop_proposing_view": 0,
2494                "start_voting_view": 9007199254740991u64,
2495                "stop_voting_view": 0,
2496                "start_proposing_time": 9007199254740991u64,
2497                "stop_proposing_time": 0,
2498                "start_voting_time": 9007199254740991u64,
2499                "stop_voting_time": 0,
2500                "epoch_height": 0,
2501                "drb_difficulty": 0,
2502                "drb_upgrade_difficulty": 0,
2503                "da_committees": [],
2504            }
2505        });
2506
2507        assert_eq!(migrate_network_config(before).unwrap(), after);
2508    }
2509
2510    #[test]
2511    fn test_config_migrations_existing_upgrade_params() {
2512        let before = json!({
2513            "config": {
2514                "builder_urls": ["https://test:8080", "https://test:8081"],
2515                "start_proposing_view": 1,
2516                "stop_proposing_view": 2,
2517                "start_voting_view": 1,
2518                "stop_voting_view": 2,
2519                "start_proposing_time": 1,
2520                "stop_proposing_time": 2,
2521                "start_voting_time": 1,
2522                "stop_voting_time": 2,
2523                "epoch_height": 0,
2524                "drb_difficulty": 0,
2525                "drb_upgrade_difficulty": 0,
2526                "da_committees": [],
2527            }
2528        });
2529
2530        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2531    }
2532
2533    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2534    pub async fn test_consensus_migration() {
2535        let rows = 300;
2536        let tmp = Persistence::tmp_storage().await;
2537        let mut opt = Persistence::options(&tmp);
2538        let storage = opt.create().await.unwrap();
2539
2540        let inner = storage.inner.read().await;
2541
2542        let decided_leaves_path = inner.decided_leaf_path();
2543        fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2544
2545        let qp_dir_path = inner.quorum_proposals_dir_path();
2546        fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2547
2548        let state_cert_dir_path = inner.state_cert_dir_path();
2549        fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2550        drop(inner);
2551
2552        assert!(storage.load_state_cert().await.unwrap().is_none());
2553
2554        for i in 0..rows {
2555            let view = ViewNumber::new(i);
2556            let validated_state = ValidatedState::default();
2557            let instance_state = NodeState::default();
2558
2559            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2560            let (payload, metadata) =
2561                Payload::from_transactions([], &validated_state, &instance_state)
2562                    .await
2563                    .unwrap();
2564
2565            let payload_bytes = payload.encode();
2566
2567            let block_header = Header::genesis(
2568                &instance_state,
2569                payload.clone(),
2570                &metadata,
2571                TEST_VERSIONS.test.base,
2572            );
2573
2574            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2575                epoch: EpochNumber::new(i),
2576                light_client_state: LightClientState {
2577                    view_number: i,
2578                    block_height: i,
2579                    block_comm_root: Default::default(),
2580                },
2581                next_stake_table_state: Default::default(),
2582                signatures: vec![], // filling arbitrary value
2583                auth_root: Default::default(),
2584            };
2585            assert!(storage.add_state_cert(state_cert).await.is_ok());
2586
2587            let null_quorum_data = QuorumData {
2588                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2589            };
2590
2591            let justify_qc = QuorumCertificate::new(
2592                null_quorum_data.clone(),
2593                null_quorum_data.commit(),
2594                view,
2595                None,
2596                PhantomData,
2597            );
2598
2599            let quorum_proposal = QuorumProposal {
2600                block_header,
2601                view_number: view,
2602                justify_qc: justify_qc.clone(),
2603                upgrade_certificate: None,
2604                proposal_certificate: None,
2605            };
2606
2607            let quorum_proposal_signature =
2608                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2609                    .expect("Failed to sign quorum proposal");
2610
2611            let proposal = Proposal {
2612                data: quorum_proposal.clone(),
2613                signature: quorum_proposal_signature,
2614                _pd: PhantomData::<SeqTypes>,
2615            };
2616
2617            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2618            leaf.fill_block_payload(
2619                payload,
2620                GENESIS_VID_NUM_STORAGE_NODES,
2621                TEST_VERSIONS.test.base,
2622            )
2623            .unwrap();
2624
2625            let mut inner = storage.inner.write().await;
2626
2627            tracing::debug!("inserting decided leaves");
2628            let file_path = decided_leaves_path
2629                .join(view.to_string())
2630                .with_extension("txt");
2631
2632            tracing::debug!("inserting decided leaves");
2633
2634            inner
2635                .replace(
2636                    &file_path,
2637                    |_| Ok(true),
2638                    |mut file| {
2639                        let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2640                        file.write_all(&bytes)?;
2641                        Ok(())
2642                    },
2643                )
2644                .expect("replace decided leaves");
2645
2646            let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2647
2648            tracing::debug!("inserting qc for {view}");
2649
2650            inner
2651                .replace(
2652                    &file_path,
2653                    |_| Ok(true),
2654                    |mut file| {
2655                        let proposal_bytes =
2656                            bincode::serialize(&proposal).context("serialize proposal")?;
2657
2658                        file.write_all(&proposal_bytes)?;
2659                        Ok(())
2660                    },
2661                )
2662                .unwrap();
2663
2664            drop(inner);
2665            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2666                .disperse(payload_bytes.clone())
2667                .unwrap();
2668
2669            let vid = VidDisperseShare0::<SeqTypes> {
2670                view_number: ViewNumber::new(i),
2671                payload_commitment: Default::default(),
2672                share: disperse.shares[0].clone(),
2673                common: disperse.common,
2674                recipient_key: pubkey,
2675            };
2676
2677            let (payload, metadata) =
2678                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2679                    .await
2680                    .unwrap();
2681
2682            let da = DaProposal::<SeqTypes> {
2683                encoded_transactions: payload.encode(),
2684                metadata,
2685                view_number: ViewNumber::new(i),
2686            };
2687
2688            let block_payload_signature =
2689                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2690
2691            let da_proposal = Proposal {
2692                data: da,
2693                signature: block_payload_signature,
2694                _pd: Default::default(),
2695            };
2696
2697            tracing::debug!("inserting vid for {view}");
2698            storage
2699                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2700                .await
2701                .unwrap();
2702
2703            tracing::debug!("inserting da for {view}");
2704            storage
2705                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2706                .await
2707                .unwrap();
2708        }
2709
2710        storage.migrate_storage().await.unwrap();
2711        let inner = storage.inner.read().await;
2712        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2713        let decided_leaves_count = decided_leaves
2714            .filter_map(Result::ok)
2715            .filter(|e| e.path().is_file())
2716            .count();
2717        assert_eq!(
2718            decided_leaves_count, rows as usize,
2719            "decided leaves count does not match",
2720        );
2721
2722        let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2723        let da_proposals_count = da_proposals
2724            .filter_map(Result::ok)
2725            .filter(|e| e.path().is_file())
2726            .count();
2727        assert_eq!(
2728            da_proposals_count, rows as usize,
2729            "da proposals does not match",
2730        );
2731
2732        let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2733        let vids_count = vids
2734            .filter_map(Result::ok)
2735            .filter(|e| e.path().is_file())
2736            .count();
2737        assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2738
2739        let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2740        let qps_count = qps
2741            .filter_map(Result::ok)
2742            .filter(|e| e.path().is_file())
2743            .count();
2744        assert_eq!(
2745            qps_count, rows as usize,
2746            "quorum proposals count does not match",
2747        );
2748
2749        let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2750        let state_cert_count = state_certs
2751            .filter_map(Result::ok)
2752            .filter(|e| e.path().is_file())
2753            .count();
2754        assert_eq!(
2755            state_cert_count, rows as usize,
2756            "light client state update certificate count does not match",
2757        );
2758
2759        // Reinitialize the file system persistence using the same path.
2760        // re run the consensus migration.
2761        // No changes will occur, as the migration has already been completed.
2762        let storage = opt.create().await.unwrap();
2763        storage.migrate_storage().await.unwrap();
2764
2765        let inner = storage.inner.read().await;
2766        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2767        let decided_leaves_count = decided_leaves
2768            .filter_map(Result::ok)
2769            .filter(|e| e.path().is_file())
2770            .count();
2771        assert_eq!(
2772            decided_leaves_count, rows as usize,
2773            "decided leaves count does not match",
2774        );
2775    }
2776
2777    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2778    async fn test_load_quorum_proposals_invalid_extension() {
2779        let tmp = Persistence::tmp_storage().await;
2780        let storage = Persistence::connect(&tmp).await;
2781
2782        // Generate a couple of valid quorum proposals.
2783        let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2784        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2785        let signature = PubKey::sign(&privkey, &[]).unwrap();
2786        let mut quorum_proposal = Proposal {
2787            data: QuorumProposalWrapper::<SeqTypes> {
2788                proposal: QuorumProposal2::<SeqTypes> {
2789                    epoch: None,
2790                    block_header: leaf.block_header().clone(),
2791                    view_number: ViewNumber::genesis(),
2792                    justify_qc: QuorumCertificate2::genesis(
2793                        &Default::default(),
2794                        &NodeState::mock(),
2795                        TEST_VERSIONS.test,
2796                    )
2797                    .await,
2798                    upgrade_certificate: None,
2799                    view_change_evidence: None,
2800                    next_drb_result: None,
2801                    next_epoch_justify_qc: None,
2802                    state_cert: None,
2803                },
2804            },
2805            signature,
2806            _pd: Default::default(),
2807        };
2808
2809        // Store quorum proposals.
2810        let quorum_proposal1 = quorum_proposal.clone();
2811        storage
2812            .append_quorum_proposal2(&quorum_proposal1)
2813            .await
2814            .unwrap();
2815        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2816        let quorum_proposal2 = quorum_proposal.clone();
2817        storage
2818            .append_quorum_proposal2(&quorum_proposal2)
2819            .await
2820            .unwrap();
2821
2822        // Change one of the file extensions. It can happen that we end up with files with the wrong
2823        // extension if, for example, the node is killed before cleaning up a swap file.
2824        fs::rename(
2825            tmp.path().join("quorum_proposals2/1.txt"),
2826            tmp.path().join("quorum_proposals2/1.swp"),
2827        )
2828        .unwrap();
2829
2830        // Loading should simply ignore the unrecognized extension.
2831        assert_eq!(
2832            storage.load_quorum_proposals().await.unwrap(),
2833            [(ViewNumber::genesis(), quorum_proposal1)]
2834                .into_iter()
2835                .collect::<BTreeMap<_, _>>()
2836        );
2837    }
2838
2839    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2840    async fn test_cert2_persisted_as_bin() {
2841        let tmp = Persistence::tmp_storage().await;
2842        let storage = Persistence::connect(&tmp).await;
2843        let view = ViewNumber::new(7);
2844        let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2845        let data = Vote2Data {
2846            leaf_commit: leaf.commit(),
2847            epoch: EpochNumber::new(1),
2848            block_number: leaf.height(),
2849        };
2850        let cert2 = Certificate2::new(data.clone(), data.commit(), view, None, PhantomData);
2851
2852        storage.append_cert2(view, cert2.clone()).await.unwrap();
2853
2854        assert!(tmp.path().join("decided_cert2/7.bin").is_file());
2855        assert!(!tmp.path().join("decided_cert2/7.txt").exists());
2856        assert_eq!(storage.load_cert2(view).await.unwrap(), Some(cert2));
2857    }
2858
2859    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2860    async fn test_load_quorum_proposals_malformed_data() {
2861        let tmp = Persistence::tmp_storage().await;
2862        let storage = Persistence::connect(&tmp).await;
2863
2864        // Generate a valid quorum proposal.
2865        let leaf: Leaf2 = Leaf::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base)
2866            .await
2867            .into();
2868        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2869        let signature = PubKey::sign(&privkey, &[]).unwrap();
2870        let quorum_proposal = Proposal {
2871            data: QuorumProposalWrapper::<SeqTypes> {
2872                proposal: QuorumProposal2::<SeqTypes> {
2873                    epoch: None,
2874                    block_header: leaf.block_header().clone(),
2875                    view_number: ViewNumber::new(1),
2876                    justify_qc: QuorumCertificate2::genesis(
2877                        &Default::default(),
2878                        &NodeState::mock(),
2879                        TEST_VERSIONS.test,
2880                    )
2881                    .await,
2882                    upgrade_certificate: None,
2883                    view_change_evidence: None,
2884                    next_drb_result: None,
2885                    next_epoch_justify_qc: None,
2886                    state_cert: None,
2887                },
2888            },
2889            signature,
2890            _pd: Default::default(),
2891        };
2892
2893        // First store an invalid quorum proposal.
2894        fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2895        fs::write(
2896            tmp.path().join("quorum_proposals2/0.txt"),
2897            "invalid data".as_bytes(),
2898        )
2899        .unwrap();
2900
2901        // Store valid quorum proposal.
2902        storage
2903            .append_quorum_proposal2(&quorum_proposal)
2904            .await
2905            .unwrap();
2906
2907        // Loading should ignore the invalid data and return the valid proposal.
2908        assert_eq!(
2909            storage.load_quorum_proposals().await.unwrap(),
2910            [(ViewNumber::new(1), quorum_proposal)]
2911                .into_iter()
2912                .collect::<BTreeMap<_, _>>()
2913        );
2914    }
2915
2916    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2917    async fn test_store_events_empty() {
2918        let tmp = Persistence::tmp_storage().await;
2919        let mut opt = Persistence::options(&tmp);
2920        let storage = opt.create().await.unwrap();
2921
2922        assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
2923
2924        // Storing an empty events list still updates the latest L1 block.
2925        for i in 1..=2 {
2926            tracing::info!(i, "update l1 height");
2927            storage.store_events(i, vec![]).await.unwrap();
2928            assert_eq!(
2929                storage.load_events(0, 100).await.unwrap(),
2930                (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
2931            );
2932        }
2933    }
2934
2935    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2936    async fn test_migrate_x25519_keys() {
2937        use std::collections::HashMap;
2938
2939        use alloy::primitives::{Address, U256};
2940        use indexmap::IndexMap;
2941
2942        use crate::persistence::RegisteredValidatorNoX25519;
2943
2944        let tmp = Persistence::tmp_storage().await;
2945        let mut opt = Persistence::options(&tmp);
2946        let storage = opt.create().await.unwrap();
2947
2948        let addr = Address::random();
2949        let legacy_validator = RegisteredValidatorNoX25519 {
2950            account: addr,
2951            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2952            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2953            stake: U256::from(1000),
2954            commission: 100,
2955            delegators: HashMap::new(),
2956            authenticated: true,
2957        };
2958
2959        // Serialize bincode stake table in legacy format (no x25519 fields)
2960        let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2961        legacy_map.insert(addr, legacy_validator);
2962
2963        type LegacyTuple = (
2964            IndexMap<Address, RegisteredValidatorNoX25519>,
2965            Option<RewardAmount>,
2966            Option<StakeTableHash>,
2967        );
2968        let legacy_data: LegacyTuple = (legacy_map, None, None);
2969        let bytes = bincode::serialize(&legacy_data).unwrap();
2970
2971        let inner = storage.inner.read().await;
2972        let path = inner.stake_table_dir_path();
2973        drop(inner);
2974        fs::create_dir_all(&path).unwrap();
2975        fs::write(path.join("1.txt"), &bytes).unwrap();
2976
2977        // Also create a JSON validators file without x25519 fields
2978        let json_validator = RegisteredValidatorNoX25519 {
2979            account: addr,
2980            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2981            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2982            stake: U256::from(2000),
2983            commission: 200,
2984            delegators: HashMap::new(),
2985            authenticated: true,
2986        };
2987        let mut json_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2988        json_map.insert(addr, json_validator);
2989        let validators_dir = path.join("validators");
2990        fs::create_dir_all(&validators_dir).unwrap();
2991        let json_path = validators_dir.join("epoch_1.json");
2992        fs::write(&json_path, serde_json::to_string_pretty(&json_map).unwrap()).unwrap();
2993
2994        // Run migration
2995        storage.migrate_x25519_keys().await.unwrap();
2996
2997        // Bincode file should now be loadable as current format
2998        let result = storage.load_stake(EpochNumber::new(1)).await.unwrap();
2999        assert!(result.is_some());
3000        let (validators, reward, hash) = result.unwrap();
3001        assert_eq!(validators.len(), 1);
3002        let v = validators.get(&addr).unwrap();
3003        assert_eq!(v.stake, U256::from(1000));
3004        assert!(v.x25519_key.is_none());
3005        assert!(v.p2p_addr.is_none());
3006        assert!(reward.is_none());
3007        assert!(hash.is_none());
3008
3009        // JSON file should now be loadable as RegisteredValidatorMap
3010        let json_content = fs::read_to_string(&json_path).unwrap();
3011        let parsed: espresso_types::RegisteredValidatorMap =
3012            serde_json::from_str(&json_content).unwrap();
3013        assert_eq!(parsed.len(), 1);
3014        let json_v = parsed.get(&addr).unwrap();
3015        assert_eq!(json_v.stake, U256::from(2000));
3016        assert!(json_v.x25519_key.is_none());
3017
3018        // Idempotent: running again is a no-op
3019        storage.migrate_x25519_keys().await.unwrap();
3020        let result2 = storage.load_stake(EpochNumber::new(1)).await;
3021        assert!(result2.is_ok());
3022    }
3023
3024    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3025    async fn test_migrate_x25519_keys_no_stake_dir() {
3026        let tmp = Persistence::tmp_storage().await;
3027        let mut opt = Persistence::options(&tmp);
3028        let storage = opt.create().await.unwrap();
3029
3030        // No stake_table dir exists. Migration should succeed.
3031        storage.migrate_x25519_keys().await.unwrap();
3032
3033        // Create stake_table dir with garbage data.
3034        let inner = storage.inner.read().await;
3035        let path = inner.stake_table_dir_path();
3036        drop(inner);
3037        fs::create_dir_all(&path).unwrap();
3038        fs::write(path.join("1.txt"), b"garbage").unwrap();
3039
3040        // If migration was properly marked done, this is a no-op.
3041        storage.migrate_x25519_keys().await.unwrap();
3042    }
3043
3044    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3045    async fn test_store_all_validators_authenticated_and_unauthenticated() {
3046        use std::collections::HashMap;
3047
3048        use alloy::primitives::{Address, U256};
3049        use espresso_types::v0_3::RegisteredValidator;
3050        use indexmap::IndexMap;
3051
3052        let tmp = Persistence::tmp_storage().await;
3053        let mut opt = Persistence::options(&tmp);
3054        let storage = opt.create().await.unwrap();
3055
3056        // Create an authenticated validator
3057        let authenticated_validator = RegisteredValidator {
3058            account: Address::random(),
3059            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3060            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
3061            stake: U256::from(1000),
3062            commission: 100,
3063            delegators: HashMap::new(),
3064            authenticated: true,
3065            x25519_key: None,
3066            p2p_addr: None,
3067        };
3068
3069        // Create an unauthenticated validator
3070        let unauthenticated_validator = RegisteredValidator {
3071            account: Address::random(),
3072            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3073            state_ver_key: hotshot_types::light_client::StateVerKey::default(),
3074            stake: U256::from(2000),
3075            commission: 200,
3076            delegators: HashMap::new(),
3077            authenticated: false,
3078            x25519_key: None,
3079            p2p_addr: None,
3080        };
3081
3082        let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3083        validators.insert(
3084            authenticated_validator.account,
3085            authenticated_validator.clone(),
3086        );
3087        validators.insert(
3088            unauthenticated_validator.account,
3089            unauthenticated_validator.clone(),
3090        );
3091
3092        // Store both validators
3093        storage
3094            .store_all_validators(EpochNumber::new(1), validators)
3095            .await
3096            .unwrap();
3097
3098        // Load and verify
3099        let loaded = storage
3100            .load_all_validators(EpochNumber::new(1), 0, 100)
3101            .await
3102            .unwrap();
3103        assert_eq!(loaded.len(), 2);
3104
3105        // Find each validator and verify authenticated state is preserved
3106        let loaded_auth = loaded
3107            .iter()
3108            .find(|v| v.account == authenticated_validator.account)
3109            .unwrap();
3110        assert!(
3111            loaded_auth.authenticated,
3112            "authenticated validator should remain authenticated"
3113        );
3114
3115        let loaded_unauth = loaded
3116            .iter()
3117            .find(|v| v.account == unauthenticated_validator.account)
3118            .unwrap();
3119        assert!(
3120            !loaded_unauth.authenticated,
3121            "unauthenticated validator should remain unauthenticated"
3122        );
3123    }
3124}