1use std::{
2 collections::{BTreeMap, HashSet},
3 fs::{self, File, OpenOptions},
4 io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5 ops::RangeInclusive,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Instant,
9};
10
11use alloy::primitives::Address;
12use anyhow::{Context, anyhow, bail};
13use async_lock::RwLock;
14use async_trait::async_trait;
15use clap::Parser;
16use espresso_types::{
17 AuthenticatedValidatorMap, Leaf, Leaf2, NetworkConfig, Payload, PubKey, RegisteredValidatorMap,
18 SeqTypes, StakeTableHash,
19 traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
20 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
21 v0_3::{
22 AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
23 StakeTableEvent,
24 },
25};
26use hotshot::InitializerEpochInfo;
27use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
28 DhtPersistentStorage, SerializableRecord,
29};
30use hotshot_new_protocol::message::Certificate2;
31use hotshot_types::{
32 data::{
33 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
34 QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
35 },
36 drb::{DrbInput, DrbResult},
37 event::{Event, EventType, HotShotAction, LeafInfo},
38 message::{Proposal, convert_proposal},
39 new_protocol::CoordinatorEvent,
40 simple_certificate::{
41 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
42 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
43 },
44 traits::{
45 block_contents::{BlockHeader, BlockPayload},
46 metrics::Metrics,
47 node_implementation::NodeType,
48 },
49 vote::HasViewNumber,
50};
51use itertools::Itertools;
52
53use super::RegisteredValidatorNoX25519;
54use crate::{
55 RECENT_STAKE_TABLES_LIMIT, ViewNumber,
56 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
57};
58
59fn deserialize_stake_table(bytes: &[u8]) -> anyhow::Result<(StakeTuple, bool)> {
62 if let Ok(stake) = bincode::deserialize::<StakeTuple>(bytes) {
64 return Ok((stake, false));
65 }
66
67 type LegacyMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
69 type LegacyTuple = (LegacyMap, Option<RewardAmount>, Option<StakeTableHash>);
70 let legacy: LegacyTuple = bincode::deserialize(bytes)
71 .context("failed to deserialize stake table (tried current and legacy formats)")?;
72 let migrated: AuthenticatedValidatorMap = legacy
73 .0
74 .into_iter()
75 .map(|(addr, v)| {
76 let registered = v.migrate();
77 (
78 addr,
79 AuthenticatedValidator::try_from(registered)
80 .expect("stake tables only contain authenticated validators"),
81 )
82 })
83 .collect();
84 Ok(((migrated, legacy.1, legacy.2), true))
85}
86
87#[derive(Parser, Clone, Debug)]
89pub struct Options {
90 #[clap(long, env = "ESPRESSO_NODE_STORAGE_PATH")]
92 pub(crate) path: PathBuf,
93
94 #[clap(
106 long,
107 env = "ESPRESSO_NODE_CONSENSUS_VIEW_RETENTION",
108 default_value = "130000"
109 )]
110 pub(crate) consensus_view_retention: u64,
111}
112
113impl Default for Options {
114 fn default() -> Self {
115 Self::parse_from(std::iter::empty::<String>())
116 }
117}
118
119impl Options {
120 pub fn new(path: PathBuf) -> Self {
121 Self {
122 path,
123 consensus_view_retention: 130000,
124 }
125 }
126
127 pub(crate) fn path(&self) -> &Path {
128 &self.path
129 }
130}
131
132#[async_trait]
133impl PersistenceOptions for Options {
134 type Persistence = Persistence;
135
136 fn set_view_retention(&mut self, view_retention: u64) {
137 self.consensus_view_retention = view_retention;
138 }
139
140 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
141 let path = self.path.clone();
142 let view_retention = self.consensus_view_retention;
143
144 let migration_path = path.join("migration");
145 let migrated = if migration_path.is_file() {
146 let bytes = fs::read(&migration_path).context(format!(
147 "unable to read migration from {}",
148 migration_path.display()
149 ))?;
150 bincode::deserialize(&bytes).context("malformed migration file")?
151 } else {
152 HashSet::new()
153 };
154
155 Ok(Persistence {
156 inner: Arc::new(RwLock::new(Inner {
157 path,
158 migrated,
159 view_retention,
160 })),
161 metrics: Arc::new(PersistenceMetricsValue::default()),
162 })
163 }
164
165 async fn reset(self) -> anyhow::Result<()> {
166 todo!()
167 }
168}
169
170#[derive(Clone, Debug)]
172pub struct Persistence {
173 inner: Arc<RwLock<Inner>>,
177 metrics: Arc<PersistenceMetricsValue>,
179}
180
181#[derive(Debug)]
182struct Inner {
183 path: PathBuf,
184 view_retention: u64,
185 migrated: HashSet<String>,
186}
187
188impl Inner {
189 fn config_path(&self) -> PathBuf {
190 self.path.join("hotshot.cfg")
191 }
192
193 fn migration(&self) -> PathBuf {
194 self.path.join("migration")
195 }
196
197 fn voted_view_path(&self) -> PathBuf {
198 self.path.join("highest_voted_view")
199 }
200
201 fn restart_view_path(&self) -> PathBuf {
202 self.path.join("restart_view")
203 }
204
205 fn decided_leaf_path(&self) -> PathBuf {
207 self.path.join("decided_leaves")
208 }
209
210 fn decided_leaf2_path(&self) -> PathBuf {
211 self.path.join("decided_leaves2")
212 }
213
214 fn legacy_anchor_leaf_path(&self) -> PathBuf {
216 self.path.join("anchor_leaf")
217 }
218
219 fn vid_dir_path(&self) -> PathBuf {
220 self.path.join("vid")
221 }
222
223 fn vid2_dir_path(&self) -> PathBuf {
224 self.path.join("vid2")
225 }
226
227 fn da_dir_path(&self) -> PathBuf {
228 self.path.join("da")
229 }
230
231 fn drb_dir_path(&self) -> PathBuf {
232 self.path.join("drb")
233 }
234
235 fn da2_dir_path(&self) -> PathBuf {
236 self.path.join("da2")
237 }
238
239 fn quorum_proposals_dir_path(&self) -> PathBuf {
240 self.path.join("quorum_proposals")
241 }
242
243 fn quorum_proposals2_dir_path(&self) -> PathBuf {
244 self.path.join("quorum_proposals2")
245 }
246
247 fn upgrade_certificate_dir_path(&self) -> PathBuf {
248 self.path.join("upgrade_certificate")
249 }
250
251 fn stake_table_dir_path(&self) -> PathBuf {
252 self.path.join("stake_table")
253 }
254
255 fn next_epoch_qc(&self) -> PathBuf {
256 self.path.join("next_epoch_quorum_certificate")
257 }
258
259 fn eqc(&self) -> PathBuf {
260 self.path.join("eqc")
261 }
262
263 fn libp2p_dht_path(&self) -> PathBuf {
264 self.path.join("libp2p_dht")
265 }
266 fn epoch_drb_result_dir_path(&self) -> PathBuf {
267 self.path.join("epoch_drb_result")
268 }
269
270 fn epoch_root_block_header_dir_path(&self) -> PathBuf {
271 self.path.join("epoch_root_block_header")
272 }
273
274 fn finalized_state_cert_dir_path(&self) -> PathBuf {
275 self.path.join("finalized_state_cert")
276 }
277
278 fn state_cert_dir_path(&self) -> PathBuf {
279 self.path.join("state_cert")
280 }
281
282 fn decided_cert2_dir_path(&self) -> PathBuf {
283 self.path.join("decided_cert2")
284 }
285
286 fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
291 let file_path = self
292 .decided_cert2_dir_path()
293 .join(view.u64().to_string())
294 .with_extension("bin");
295 if !file_path.is_file() {
296 return Ok(None);
297 }
298 let bytes = fs::read(&file_path).context("read cert2")?;
299 Ok(Some(
300 bincode::deserialize(&bytes).context("deserialize cert2")?,
301 ))
302 }
303
304 fn update_migration(&mut self) -> anyhow::Result<()> {
305 let path = self.migration();
306 let bytes = bincode::serialize(&self.migrated)?;
307
308 self.replace(
309 &path,
310 |_| Ok(true),
311 |mut file| {
312 file.write_all(&bytes)?;
313 Ok(())
314 },
315 )
316 }
317
318 fn replace(
328 &mut self,
329 path: &Path,
330 pred: impl FnOnce(File) -> anyhow::Result<bool>,
331 write: impl FnOnce(File) -> anyhow::Result<()>,
332 ) -> anyhow::Result<()> {
333 if path.is_file() {
334 if !pred(File::open(path)?)? {
339 return Ok(());
342 }
343 }
344
345 let mut swap_path = path.to_owned();
348 swap_path.set_extension("swp");
349 let swap = OpenOptions::new()
350 .write(true)
351 .truncate(true)
352 .create(true)
353 .open(&swap_path)?;
354 write(swap)?;
355
356 fs::rename(swap_path, path)?;
358
359 Ok(())
360 }
361
362 fn collect_garbage(
363 &mut self,
364 decided_view: ViewNumber,
365 prune_intervals: &[RangeInclusive<ViewNumber>],
366 ) -> anyhow::Result<()> {
367 let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
368
369 self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
370 self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
371 self.prune_files(
372 self.quorum_proposals2_dir_path(),
373 prune_view,
374 None,
375 prune_intervals,
376 )?;
377 self.prune_files(
378 self.state_cert_dir_path(),
379 prune_view,
380 None,
381 prune_intervals,
382 )?;
383
384 self.prune_files(
386 self.decided_leaf2_path(),
387 prune_view,
388 Some(decided_view),
389 prune_intervals,
390 )?;
391
392 Ok(())
393 }
394
395 fn prune_files(
396 &mut self,
397 dir_path: PathBuf,
398 prune_view: ViewNumber,
399 keep_decided_view: Option<ViewNumber>,
400 prune_intervals: &[RangeInclusive<ViewNumber>],
401 ) -> anyhow::Result<()> {
402 if !dir_path.is_dir() {
403 return Ok(());
404 }
405
406 for (file_view, path) in view_files(dir_path)? {
407 if let Some(decided_view) = keep_decided_view
409 && decided_view == file_view
410 {
411 continue;
412 }
413 if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
417 fs::remove_file(&path)?;
418 }
419 }
420
421 Ok(())
422 }
423
424 fn parse_decided_leaf(
425 &self,
426 bytes: &[u8],
427 ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
428 match bincode::deserialize(bytes) {
432 Ok((leaf, cert)) => Ok((leaf, cert)),
433 Err(err) => {
434 tracing::info!(
435 "error parsing decided leaf, maybe file was created without next epoch QC? \
436 {err}"
437 );
438 let (leaf, qc) =
439 bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
440 .context("parsing decided leaf")?;
441 Ok((leaf, CertificatePair::non_epoch_change(qc)))
442 },
443 }
444 }
445
446 async fn generate_decide_events(
451 &mut self,
452 view: ViewNumber,
453 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
454 consumer: &impl EventConsumer,
455 ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
456 let mut leaves = BTreeMap::new();
460 for (v, path) in view_files(self.decided_leaf2_path())? {
461 if v > view {
462 continue;
463 }
464
465 let bytes =
466 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
467 let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
468
469 let vid_proposal = self.load_vid_share(v)?;
471 if vid_proposal.is_none() {
472 tracing::debug!(?v, "VID share not available at decide");
473 }
474 let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone());
475
476 let state_cert = self.store_finalized_state_cert(v)?;
478
479 if let Some(proposal) = self.load_da_proposal(v)? {
481 let payload = Payload::from_bytes(
482 &proposal.data.encoded_transactions,
483 &proposal.data.metadata,
484 );
485 leaf.fill_block_payload_unchecked(payload);
486 } else {
487 tracing::debug!(?v, "DA proposal not available at decide");
488 }
489
490 let info = LeafInfo {
491 leaf,
492 vid_share,
493 state_cert,
494 state: Default::default(),
497 delta: Default::default(),
498 };
499
500 leaves.insert(v, (info, cert));
501 }
502
503 if let Some((oldest_view, _)) = leaves.first_key_value() {
507 if *oldest_view > ViewNumber::genesis() {
510 leaves.pop_first();
511 }
512 }
513
514 let mut intervals = vec![];
515 let mut current_interval = None;
516 for (view, (leaf, cert)) in leaves {
517 let height = leaf.leaf.block_header().block_number();
518
519 let event = if leaf.leaf.block_header().version() >= versions::NEW_PROTOCOL_VERSION {
520 let cert2 = self.load_cert2(view)?;
521 CoordinatorEvent::NewDecide {
526 leaf_infos: vec![leaf],
527 cert1: cert.qc().clone(),
528 cert2,
529 }
530 } else {
531 let deciding_qc = deciding_qc
532 .as_ref()
533 .filter(|qc| qc.view_number() == cert.view_number() + 1)
534 .cloned();
535 CoordinatorEvent::LegacyEvent(Event {
536 view_number: view,
537 event: EventType::Decide {
538 committing_qc: Arc::new(cert),
539 deciding_qc,
540 leaf_chain: Arc::new(vec![leaf]),
541 block_size: None,
542 },
543 })
544 };
545 consumer.handle_event(&event).await?;
546
547 if let Some((start, end, current_height)) = current_interval.as_mut() {
548 if height == *current_height + 1 {
549 *current_height += 1;
552 *end = view;
553 } else {
554 intervals.push(*start..=*end);
556 current_interval = Some((view, view, height));
557 }
558 } else {
559 current_interval = Some((view, view, height));
561 }
562 }
563 if let Some((start, end, _)) = current_interval {
564 intervals.push(start..=end);
565 }
566
567 Ok(intervals)
568 }
569
570 fn load_da_proposal(
571 &self,
572 view: ViewNumber,
573 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
574 let dir_path = self.da2_dir_path();
575
576 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
577
578 if !file_path.exists() {
579 return Ok(None);
580 }
581
582 let da_bytes = fs::read(file_path)?;
583
584 let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
585 bincode::deserialize(&da_bytes)?;
586 Ok(Some(da_proposal))
587 }
588
589 fn load_vid_share(
590 &self,
591 view: ViewNumber,
592 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
593 let dir_path = self.vid2_dir_path();
594
595 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
596
597 if !file_path.exists() {
598 return Ok(None);
599 }
600
601 let vid_share_bytes = fs::read(file_path)?;
602 let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
603 bincode::deserialize(&vid_share_bytes)?;
604 Ok(Some(vid_share))
605 }
606
607 fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
608 tracing::info!("Checking `Leaf2` to load the anchor leaf.");
609 if self.decided_leaf2_path().is_dir() {
610 let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
611
612 for (_, path) in view_files(self.decided_leaf2_path())? {
614 let bytes =
615 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
616 let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
617 if let Some((anchor_leaf, _)) = &anchor {
618 if leaf.view_number() > anchor_leaf.view_number() {
619 anchor = Some((leaf, cert.qc().clone()));
620 }
621 } else {
622 anchor = Some((leaf, cert.qc().clone()));
623 }
624 }
625
626 return Ok(anchor);
627 }
628
629 tracing::warn!(
630 "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
631 This is very likely to fail."
632 );
633 if self.legacy_anchor_leaf_path().is_file() {
634 let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
637
638 file.seek(SeekFrom::Start(8)).context("seek")?;
640 let bytes = file
641 .bytes()
642 .collect::<Result<Vec<_>, _>>()
643 .context("read")?;
644 return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
645 }
646
647 Ok(None)
648 }
649
650 fn store_finalized_state_cert(
651 &mut self,
652 view: ViewNumber,
653 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
654 let dir_path = self.state_cert_dir_path();
655 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
656
657 if !file_path.exists() {
658 return Ok(None);
659 }
660
661 let bytes = fs::read(&file_path)?;
662
663 let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
664 bincode::deserialize(&bytes).or_else(|err_v2| {
665 tracing::info!(
666 error = %err_v2,
667 path = %file_path.display(),
668 "Failed to deserialize state certificate, attempting with v1"
669 );
670
671 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
672 .map(Into::into)
673 .with_context(|| {
674 format!(
675 "Failed to deserialize with both v2 and v1 from file '{}'. error: \
676 {err_v2}",
677 file_path.display()
678 )
679 })
680 })?;
681
682 let epoch = state_cert.epoch.u64();
683 let finalized_dir_path = self.finalized_state_cert_dir_path();
684 fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
685
686 let finalized_file_path = finalized_dir_path
687 .join(epoch.to_string())
688 .with_extension("txt");
689
690 self.replace(
691 &finalized_file_path,
692 |_| Ok(true),
693 |mut file| {
694 file.write_all(&bytes)?;
695 Ok(())
696 },
697 )
698 .context(format!(
699 "finalizing light client state update certificate file for epoch {epoch:?}"
700 ))?;
701
702 Ok(Some(state_cert))
703 }
704}
705
706#[async_trait]
707impl SequencerPersistence for Persistence {
708 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
709 Ok(())
710 }
711
712 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
713 let inner = self.inner.read().await;
714 let path = inner.config_path();
715 if !path.is_file() {
716 tracing::info!("config not found at {}", path.display());
717 return Ok(None);
718 }
719 tracing::info!("loading config from {}", path.display());
720
721 let bytes =
722 fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
723 let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
724 let json = migrate_network_config(json).context("migration of network config failed")?;
725 let config = serde_json::from_value(json).context("malformed config file")?;
726 Ok(Some(config))
727 }
728
729 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
730 let inner = self.inner.write().await;
731 let path = inner.config_path();
732 tracing::info!("saving config to {}", path.display());
733 Ok(cfg.to_file(path.display().to_string())?)
734 }
735
736 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
737 let inner = self.inner.read().await;
738 let path = inner.voted_view_path();
739 if !path.is_file() {
740 return Ok(None);
741 }
742 let bytes = fs::read(inner.voted_view_path())?
743 .try_into()
744 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
745 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
746 }
747
748 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
749 let inner = self.inner.read().await;
750 let path = inner.restart_view_path();
751 if !path.is_file() {
752 return Ok(None);
753 }
754 let bytes = fs::read(path)?
755 .try_into()
756 .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
757 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
758 }
759
760 async fn append_decided_leaves(
761 &self,
762 view: ViewNumber,
763 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
764 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
765 consumer: &impl EventConsumer,
766 ) -> anyhow::Result<()> {
767 let mut inner = self.inner.write().await;
768 let path = inner.decided_leaf2_path();
769
770 fs::create_dir_all(&path).context("creating anchor leaf directory")?;
772
773 let legacy_path = inner.legacy_anchor_leaf_path();
776 if !path.is_dir() && legacy_path.is_file() {
777 tracing::info!("migrating to multi-leaf storage");
778
779 let (leaf, qc) = inner
781 .load_anchor_leaf()?
782 .context("anchor leaf file exists but unable to load contents")?;
783 let view = leaf.view_number().u64();
784 let bytes = bincode::serialize(&(leaf, qc))?;
785 let new_file = path.join(view.to_string()).with_extension("txt");
786 inner
787 .replace(
788 &new_file,
789 |_| Ok(true),
790 |mut file| {
791 file.write_all(&bytes)?;
792 Ok(())
793 },
794 )
795 .context(format!("writing anchor leaf file {view}"))?;
796
797 fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
799 }
800
801 for (info, cert) in leaf_chain {
802 let view = info.leaf.view_number().u64();
803 let file_path = path.join(view.to_string()).with_extension("txt");
804 inner.replace(
805 &file_path,
806 |_| {
807 tracing::warn!(view, "duplicate decided leaf");
810 Ok(false)
811 },
812 |mut file| {
813 let bytes = bincode::serialize(&(&info.leaf, cert))?;
814 file.write_all(&bytes)?;
815 Ok(())
816 },
817 )?;
818 }
819
820 match inner
821 .generate_decide_events(view, deciding_qc, consumer)
822 .await
823 {
824 Err(err) => {
825 tracing::warn!(?view, "event processing failed: {err:#}");
829 },
830 Ok(intervals) => {
831 if let Err(err) = inner.collect_garbage(view, &intervals) {
832 tracing::warn!(?view, "GC failed: {err:#}");
836 }
837 },
838 }
839
840 Ok(())
841 }
842
843 async fn load_anchor_leaf(
844 &self,
845 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
846 self.inner.read().await.load_anchor_leaf()
847 }
848
849 async fn load_da_proposal(
850 &self,
851 view: ViewNumber,
852 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
853 self.inner.read().await.load_da_proposal(view)
854 }
855
856 async fn load_vid_share(
857 &self,
858 view: ViewNumber,
859 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
860 self.inner.read().await.load_vid_share(view)
861 }
862
863 async fn append_vid(
864 &self,
865 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
866 ) -> anyhow::Result<()> {
867 let mut inner = self.inner.write().await;
868 let view_number = proposal.data.view_number().u64();
869 let dir_path = inner.vid2_dir_path();
870
871 fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
872
873 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
874 inner.replace(
875 &file_path,
876 |_| {
877 tracing::warn!(view_number, "duplicate VID share");
880 Ok(false)
881 },
882 |mut file| {
883 let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
884 let now = Instant::now();
885 file.write_all(&proposal_bytes)?;
886 self.metrics
887 .internal_append_vid_duration
888 .add_point(now.elapsed().as_secs_f64());
889 Ok(())
890 },
891 )
892 }
893
894 async fn append_da(
895 &self,
896 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
897 _vid_commit: VidCommitment,
898 ) -> anyhow::Result<()> {
899 let mut inner = self.inner.write().await;
900 let view_number = proposal.data.view_number().u64();
901 let dir_path = inner.da_dir_path();
902
903 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
904
905 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
906 inner.replace(
907 &file_path,
908 |_| {
909 tracing::warn!(view_number, "duplicate DA proposal");
912 Ok(false)
913 },
914 |mut file| {
915 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
916 let now = Instant::now();
917 file.write_all(&proposal_bytes)?;
918 self.metrics
919 .internal_append_da_duration
920 .add_point(now.elapsed().as_secs_f64());
921 Ok(())
922 },
923 )
924 }
925 async fn record_action(
926 &self,
927 view: ViewNumber,
928 _epoch: Option<EpochNumber>,
929 action: HotShotAction,
930 ) -> anyhow::Result<()> {
931 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
933 return Ok(());
934 }
935 let mut inner = self.inner.write().await;
936 let path = &inner.voted_view_path();
937 inner.replace(
938 path,
939 |mut file| {
940 let mut bytes = vec![];
941 file.read_to_end(&mut bytes)?;
942 let bytes = bytes
943 .try_into()
944 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
945 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
946
947 Ok(saved_view < view)
949 },
950 |mut file| {
951 file.write_all(&view.u64().to_le_bytes())?;
952 Ok(())
953 },
954 )?;
955
956 if matches!(action, HotShotAction::Vote) {
957 let restart_view_path = &inner.restart_view_path();
958 let restart_view = view + 1;
959 inner.replace(
960 restart_view_path,
961 |mut file| {
962 let mut bytes = vec![];
963 file.read_to_end(&mut bytes)?;
964 let bytes = bytes
965 .try_into()
966 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
967 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
968
969 Ok(saved_view < restart_view)
971 },
972 |mut file| {
973 file.write_all(&restart_view.u64().to_le_bytes())?;
974 Ok(())
975 },
976 )?;
977 }
978 Ok(())
979 }
980
981 async fn append_quorum_proposal2(
982 &self,
983 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
984 ) -> anyhow::Result<()> {
985 let mut inner = self.inner.write().await;
986 let view_number = proposal.data.view_number().u64();
987 let dir_path = inner.quorum_proposals2_dir_path();
988
989 fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
990
991 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
992 inner.replace(
993 &file_path,
994 |_| {
995 Ok(true)
997 },
998 |mut file| {
999 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1000 let now = Instant::now();
1001 file.write_all(&proposal_bytes)?;
1002 self.metrics
1003 .internal_append_quorum2_duration
1004 .add_point(now.elapsed().as_secs_f64());
1005 Ok(())
1006 },
1007 )
1008 }
1009
1010 async fn append_cert2(
1011 &self,
1012 view: ViewNumber,
1013 cert2: Certificate2<SeqTypes>,
1014 ) -> anyhow::Result<()> {
1015 let mut inner = self.inner.write().await;
1016 let dir_path = inner.decided_cert2_dir_path();
1017 fs::create_dir_all(dir_path.clone()).context("failed to create decided_cert2 dir")?;
1018 let file_path = dir_path.join(view.u64().to_string()).with_extension("bin");
1019 inner.replace(
1020 &file_path,
1021 |_| Ok(true),
1022 |mut file| {
1023 let bytes = bincode::serialize(&cert2).context("serialize cert2")?;
1024 file.write_all(&bytes)?;
1025 Ok(())
1026 },
1027 )
1028 }
1029
1030 async fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
1031 let inner = self.inner.read().await;
1032 let dir_path = inner.decided_cert2_dir_path();
1033 let file_path = dir_path.join(view.u64().to_string()).with_extension("bin");
1034 if !file_path.is_file() {
1035 return Ok(None);
1036 }
1037 let bytes = fs::read(&file_path).context("read cert2")?;
1038 Ok(Some(
1039 bincode::deserialize(&bytes).context("deserialize cert2")?,
1040 ))
1041 }
1042 async fn load_quorum_proposals(
1043 &self,
1044 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1045 {
1046 let inner = self.inner.read().await;
1047
1048 let dir_path = inner.quorum_proposals2_dir_path();
1050 if !dir_path.is_dir() {
1051 return Ok(Default::default());
1052 }
1053
1054 let mut map = BTreeMap::new();
1056 for (view, path) in view_files(&dir_path)? {
1057 let proposal_bytes = fs::read(path)?;
1058 let Some(proposal) = bincode::deserialize::<
1059 Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1060 >(&proposal_bytes)
1061 .or_else(|error| {
1062 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1063 &proposal_bytes,
1064 )
1065 .map(convert_proposal)
1066 .inspect_err(|err_v3| {
1067 tracing::warn!(
1075 ?view,
1076 %error,
1077 error_v3 = %err_v3,
1078 "ignoring malformed quorum proposal file"
1079 );
1080 })
1081 })
1082 .ok() else {
1083 continue;
1084 };
1085
1086 let proposal2 = convert_proposal(proposal);
1087
1088 map.insert(view, proposal2);
1090 }
1091
1092 Ok(map)
1093 }
1094
1095 async fn load_quorum_proposal(
1096 &self,
1097 view: ViewNumber,
1098 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1099 let inner = self.inner.read().await;
1100 let dir_path = inner.quorum_proposals2_dir_path();
1101 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1102 let bytes = fs::read(file_path)?;
1103 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1104 bincode::deserialize(&bytes).or_else(|error| {
1105 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1106 &bytes,
1107 )
1108 .map(convert_proposal)
1109 .context(format!(
1110 "Failed to deserialize quorum proposal for view {view:?}: {error}."
1111 ))
1112 })?;
1113 Ok(proposal)
1114 }
1115
1116 async fn load_upgrade_certificate(
1117 &self,
1118 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1119 let inner = self.inner.read().await;
1120 let path = inner.upgrade_certificate_dir_path();
1121 if !path.is_file() {
1122 return Ok(None);
1123 }
1124 let bytes = fs::read(&path).context("read")?;
1125 Ok(Some(
1126 bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1127 ))
1128 }
1129
1130 async fn store_upgrade_certificate(
1131 &self,
1132 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1133 ) -> anyhow::Result<()> {
1134 let mut inner = self.inner.write().await;
1135 let path = &inner.upgrade_certificate_dir_path();
1136 let certificate = match decided_upgrade_certificate {
1137 Some(cert) => cert,
1138 None => return Ok(()),
1139 };
1140 inner.replace(
1141 path,
1142 |_| {
1143 Ok(true)
1145 },
1146 |mut file| {
1147 let bytes =
1148 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1149 file.write_all(&bytes)?;
1150 Ok(())
1151 },
1152 )
1153 }
1154
1155 async fn store_next_epoch_quorum_certificate(
1156 &self,
1157 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1158 ) -> anyhow::Result<()> {
1159 let mut inner = self.inner.write().await;
1160 let path = &inner.next_epoch_qc();
1161
1162 inner.replace(
1163 path,
1164 |_| {
1165 Ok(true)
1167 },
1168 |mut file| {
1169 let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1170 file.write_all(&bytes)?;
1171 Ok(())
1172 },
1173 )
1174 }
1175
1176 async fn load_next_epoch_quorum_certificate(
1177 &self,
1178 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1179 let inner = self.inner.read().await;
1180 let path = inner.next_epoch_qc();
1181 if !path.is_file() {
1182 return Ok(None);
1183 }
1184 let bytes = fs::read(&path).context("read")?;
1185 Ok(Some(
1186 bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1187 ))
1188 }
1189
1190 async fn store_eqc(
1191 &self,
1192 high_qc: QuorumCertificate2<SeqTypes>,
1193 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1194 ) -> anyhow::Result<()> {
1195 let mut inner = self.inner.write().await;
1196 let path = &inner.eqc();
1197
1198 inner.replace(
1199 path,
1200 |_| {
1201 Ok(true)
1203 },
1204 |mut file| {
1205 let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1206 .context("serializing next epoch qc")?;
1207 file.write_all(&bytes)?;
1208 Ok(())
1209 },
1210 )
1211 }
1212
1213 async fn load_eqc(
1214 &self,
1215 ) -> Option<(
1216 QuorumCertificate2<SeqTypes>,
1217 NextEpochQuorumCertificate2<SeqTypes>,
1218 )> {
1219 let inner = self.inner.read().await;
1220 let path = inner.eqc();
1221 if !path.is_file() {
1222 return None;
1223 }
1224 let bytes = fs::read(&path).ok()?;
1225
1226 bincode::deserialize(&bytes).ok()
1227 }
1228
1229 async fn append_da2(
1230 &self,
1231 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1232 _vid_commit: VidCommitment,
1233 ) -> anyhow::Result<()> {
1234 let mut inner = self.inner.write().await;
1235 let view_number = proposal.data.view_number().u64();
1236 let dir_path = inner.da2_dir_path();
1237
1238 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1239
1240 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1241 inner.replace(
1242 &file_path,
1243 |_| {
1244 tracing::warn!(view_number, "duplicate DA proposal");
1247 Ok(false)
1248 },
1249 |mut file| {
1250 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1251 let now = Instant::now();
1252 file.write_all(&proposal_bytes)?;
1253 self.metrics
1254 .internal_append_da2_duration
1255 .add_point(now.elapsed().as_secs_f64());
1256 Ok(())
1257 },
1258 )
1259 }
1260
1261 async fn append_proposal2(
1262 &self,
1263 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1264 ) -> anyhow::Result<()> {
1265 self.append_quorum_proposal2(proposal).await
1266 }
1267
1268 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1269 let mut inner = self.inner.write().await;
1270
1271 if inner.migrated.contains("anchor_leaf") {
1272 tracing::info!("decided leaves already migrated");
1273 return Ok(());
1274 }
1275
1276 let new_leaf_dir = inner.decided_leaf2_path();
1277
1278 fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2 dir")?;
1279
1280 let old_leaf_dir = inner.decided_leaf_path();
1281 if !old_leaf_dir.is_dir() {
1282 return Ok(());
1283 }
1284
1285 tracing::warn!("migrating decided leaves..");
1286 for entry in fs::read_dir(old_leaf_dir)? {
1287 let entry = entry?;
1288 let path = entry.path();
1289
1290 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1291 continue;
1292 };
1293 let Ok(view) = file.parse::<u64>() else {
1294 continue;
1295 };
1296
1297 let bytes =
1298 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1299 let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1300 .context(format!("parsing decided leaf {}", path.display()))?;
1301
1302 let leaf2: Leaf2 = leaf.into();
1303 let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1304
1305 let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1306
1307 inner.replace(
1308 &new_leaf_path,
1309 |_| {
1310 tracing::warn!(view, "duplicate decided leaf");
1311 Ok(false)
1312 },
1313 |mut file| {
1314 let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1315 file.write_all(&bytes)?;
1316 Ok(())
1317 },
1318 )?;
1319
1320 if view % 100 == 0 {
1321 tracing::info!(view, "decided leaves migration progress");
1322 }
1323 }
1324
1325 inner.migrated.insert("anchor_leaf".to_string());
1326 inner.update_migration()?;
1327 tracing::warn!("successfully migrated decided leaves");
1328 Ok(())
1329 }
1330 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1331 let mut inner = self.inner.write().await;
1332
1333 if inner.migrated.contains("da_proposal") {
1334 tracing::info!("da proposals already migrated");
1335 return Ok(());
1336 }
1337
1338 let new_da_dir = inner.da2_dir_path();
1339
1340 fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1341
1342 let old_da_dir = inner.da_dir_path();
1343 if !old_da_dir.is_dir() {
1344 return Ok(());
1345 }
1346
1347 tracing::warn!("migrating da proposals..");
1348
1349 for entry in fs::read_dir(old_da_dir)? {
1350 let entry = entry?;
1351 let path = entry.path();
1352
1353 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1354 continue;
1355 };
1356 let Ok(view) = file.parse::<u64>() else {
1357 continue;
1358 };
1359
1360 let bytes =
1361 fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1362 let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1363 .context(format!("parsing da proposal {}", path.display()))?;
1364
1365 let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1366
1367 let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1368
1369 inner.replace(
1370 &new_da_path,
1371 |_| {
1372 tracing::warn!(view, "duplicate DA proposal 2");
1373 Ok(false)
1374 },
1375 |mut file| {
1376 let bytes = bincode::serialize(&proposal2)?;
1377 file.write_all(&bytes)?;
1378 Ok(())
1379 },
1380 )?;
1381
1382 if view % 100 == 0 {
1383 tracing::info!(view, "DA proposals migration progress");
1384 }
1385 }
1386
1387 inner.migrated.insert("da_proposal".to_string());
1388 inner.update_migration()?;
1389 tracing::warn!("successfully migrated da proposals");
1390 Ok(())
1391 }
1392 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1393 let mut inner = self.inner.write().await;
1394
1395 if inner.migrated.contains("vid_share") {
1396 tracing::info!("vid shares already migrated");
1397 return Ok(());
1398 }
1399
1400 let new_vid_dir = inner.vid2_dir_path();
1401
1402 fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1403
1404 let old_vid_dir = inner.vid_dir_path();
1405 if !old_vid_dir.is_dir() {
1406 return Ok(());
1407 }
1408
1409 tracing::warn!("migrating vid shares..");
1410
1411 for entry in fs::read_dir(old_vid_dir)? {
1412 let entry = entry?;
1413 let path = entry.path();
1414
1415 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1416 continue;
1417 };
1418 let Ok(view) = file.parse::<u64>() else {
1419 continue;
1420 };
1421
1422 let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1423 let proposal =
1424 bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1425 .context(format!("parsing vid share {}", path.display()))?;
1426
1427 let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1428
1429 let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1430 convert_proposal(proposal);
1431
1432 inner.replace(
1433 &new_vid_path,
1434 |_| {
1435 tracing::warn!(view, "duplicate VID share ");
1436 Ok(false)
1437 },
1438 |mut file| {
1439 let bytes = bincode::serialize(&proposal2)?;
1440 file.write_all(&bytes)?;
1441 Ok(())
1442 },
1443 )?;
1444
1445 if view % 100 == 0 {
1446 tracing::info!(view, "VID shares migration progress");
1447 }
1448 }
1449
1450 inner.migrated.insert("vid_share".to_string());
1451 inner.update_migration()?;
1452 tracing::warn!("successfully migrated vid shares");
1453 Ok(())
1454 }
1455
1456 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1457 let mut inner = self.inner.write().await;
1458
1459 if inner.migrated.contains("quorum_proposals") {
1460 tracing::info!("quorum proposals already migrated");
1461 return Ok(());
1462 }
1463
1464 let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1465
1466 fs::create_dir_all(new_quorum_proposals_dir.clone())
1467 .context("failed to create quorum proposals 2 dir")?;
1468
1469 let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1470 if !old_quorum_proposals_dir.is_dir() {
1471 tracing::info!("no existing quorum proposals found for migration");
1472 return Ok(());
1473 }
1474
1475 tracing::warn!("migrating quorum proposals..");
1476 for entry in fs::read_dir(old_quorum_proposals_dir)? {
1477 let entry = entry?;
1478 let path = entry.path();
1479
1480 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1481 continue;
1482 };
1483 let Ok(view) = file.parse::<u64>() else {
1484 continue;
1485 };
1486
1487 let bytes =
1488 fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1489 let proposal =
1490 bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1491 .context(format!("parsing quorum proposal {}", path.display()))?;
1492
1493 let new_file_path = new_quorum_proposals_dir
1494 .join(view.to_string())
1495 .with_extension("txt");
1496
1497 let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1498 convert_proposal(proposal);
1499
1500 inner.replace(
1501 &new_file_path,
1502 |_| {
1503 tracing::warn!(view, "duplicate Quorum proposal2 ");
1504 Ok(false)
1505 },
1506 |mut file| {
1507 let bytes = bincode::serialize(&proposal2)?;
1508 file.write_all(&bytes)?;
1509 Ok(())
1510 },
1511 )?;
1512
1513 if view % 100 == 0 {
1514 tracing::info!(view, "Quorum proposals migration progress");
1515 }
1516 }
1517
1518 inner.migrated.insert("quorum_proposals".to_string());
1519 inner.update_migration()?;
1520 tracing::warn!("successfully migrated quorum proposals");
1521 Ok(())
1522 }
1523 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1524 Ok(())
1525 }
1526
1527 async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
1528 let mut inner = self.inner.write().await;
1529
1530 if inner.migrated.contains("x25519_keys") {
1531 tracing::info!("x25519_keys migration already complete");
1532 return Ok(());
1533 }
1534
1535 let path = inner.stake_table_dir_path();
1536 if !path.is_dir() {
1537 inner.migrated.insert("x25519_keys".to_string());
1538 inner.update_migration()?;
1539 return Ok(());
1540 }
1541
1542 tracing::warn!("migrating stake tables to add x25519 key fields...");
1543
1544 for (epoch, file_path) in epoch_files(&path)? {
1545 let bytes = fs::read(&file_path).with_context(|| {
1546 format!("failed to read stake table file at {}", file_path.display())
1547 })?;
1548
1549 let (stake, needs_rewrite) = deserialize_stake_table(&bytes).with_context(|| {
1550 format!(
1551 "failed to deserialize stake table at {}",
1552 file_path.display()
1553 )
1554 })?;
1555
1556 if !needs_rewrite {
1557 continue;
1558 }
1559
1560 let new_bytes = bincode::serialize(&stake)?;
1562 let tmp_path = file_path.with_extension("txt.tmp");
1563 fs::write(&tmp_path, new_bytes)?;
1564 fs::rename(&tmp_path, &file_path)?;
1565
1566 tracing::info!(?epoch, "migrated stake table");
1567 }
1568
1569 let validators_dir = path.join("validators");
1571 if validators_dir.is_dir() {
1572 type LegacyJsonMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
1573
1574 for entry in fs::read_dir(&validators_dir)? {
1575 let entry = entry?;
1576 let file_path = entry.path();
1577 if file_path.extension().is_some_and(|ext| ext == "json") {
1578 let content = fs::read_to_string(&file_path)?;
1579
1580 if serde_json::from_str::<RegisteredValidatorMap>(&content).is_ok() {
1582 continue;
1583 }
1584
1585 let legacy: LegacyJsonMap =
1587 serde_json::from_str(&content).with_context(|| {
1588 format!(
1589 "failed to deserialize validators at {} (tried both formats)",
1590 file_path.display()
1591 )
1592 })?;
1593
1594 let migrated: RegisteredValidatorMap = legacy
1595 .into_iter()
1596 .map(|(addr, v)| (addr, v.migrate()))
1597 .collect();
1598
1599 let new_json = serde_json::to_string_pretty(&migrated)?;
1601 let tmp_path = file_path.with_extension("json.tmp");
1602 fs::write(&tmp_path, new_json)?;
1603 fs::rename(&tmp_path, &file_path)?;
1604
1605 tracing::info!(?file_path, "migrated validators file");
1606 }
1607 }
1608 }
1609
1610 inner.migrated.insert("x25519_keys".to_string());
1611 inner.update_migration()?;
1612 tracing::warn!("x25519_keys migration complete");
1613 Ok(())
1614 }
1615
1616 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1617 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1618 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1619 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1620 } else if loaded_drb_input.iteration >= drb_input.iteration {
1621 anyhow::bail!(
1622 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1623 loaded_drb_input,
1624 drb_input
1625 )
1626 }
1627 }
1628
1629 let mut inner = self.inner.write().await;
1630 let dir_path = inner.drb_dir_path();
1631
1632 fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1633
1634 let drb_input_bytes =
1635 bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1636
1637 let file_path = dir_path
1638 .join(drb_input.epoch.to_string())
1639 .with_extension("bin");
1640
1641 inner.replace(
1642 &file_path,
1643 |_| {
1644 Ok(true)
1646 },
1647 |mut file| {
1648 file.write_all(&drb_input_bytes).context(format!(
1649 "writing epoch drb_input file for epoch {:?} at {:?}",
1650 drb_input.epoch, file_path
1651 ))
1652 },
1653 )
1654 }
1655
1656 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1657 let inner = self.inner.read().await;
1658 let path = &inner.drb_dir_path();
1659 let file_path = path.join(epoch.to_string()).with_extension("bin");
1660 let bytes = fs::read(&file_path).context("read")?;
1661 Ok(bincode::deserialize(&bytes)
1662 .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1663 }
1664
1665 async fn store_drb_result(
1666 &self,
1667 epoch: EpochNumber,
1668 drb_result: DrbResult,
1669 ) -> anyhow::Result<()> {
1670 let mut inner = self.inner.write().await;
1671 let dir_path = inner.epoch_drb_result_dir_path();
1672
1673 fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1674
1675 let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1676
1677 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1678
1679 inner.replace(
1680 &file_path,
1681 |_| {
1682 Ok(true)
1684 },
1685 |mut file| {
1686 file.write_all(&drb_result_bytes)
1687 .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1688 },
1689 )
1690 }
1691
1692 async fn store_epoch_root(
1693 &self,
1694 epoch: EpochNumber,
1695 block_header: <SeqTypes as NodeType>::BlockHeader,
1696 ) -> anyhow::Result<()> {
1697 let mut inner = self.inner.write().await;
1698 let dir_path = inner.epoch_root_block_header_dir_path();
1699
1700 fs::create_dir_all(dir_path.clone())
1701 .context("failed to create epoch root block header dir")?;
1702
1703 let block_header_bytes =
1704 bincode::serialize(&block_header).context("serialize block header")?;
1705
1706 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1707 inner
1708 .replace(
1709 &file_path,
1710 |_| Ok(true),
1711 |mut file| {
1712 file.write_all(&block_header_bytes)?;
1713 Ok(())
1714 },
1715 )
1716 .context(format!(
1717 "writing epoch root block header file for epoch {epoch:?}"
1718 ))?;
1719
1720 Ok(())
1721 }
1722
1723 async fn add_state_cert(
1724 &self,
1725 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1726 ) -> anyhow::Result<()> {
1727 let mut inner = self.inner.write().await;
1728 let view = state_cert.light_client_state.view_number;
1730 let dir_path = inner.state_cert_dir_path();
1731
1732 fs::create_dir_all(dir_path.clone())
1733 .context("failed to create light client state update certificate dir")?;
1734
1735 let bytes = bincode::serialize(&state_cert)
1736 .context("serialize light client state update certificate")?;
1737
1738 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1739 inner
1740 .replace(
1741 &file_path,
1742 |_| Ok(true),
1743 |mut file| {
1744 file.write_all(&bytes)?;
1745 Ok(())
1746 },
1747 )
1748 .context(format!(
1749 "writing light client state update certificate file for view {view:?}"
1750 ))?;
1751
1752 Ok(())
1753 }
1754
1755 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1756 let inner = self.inner.read().await;
1757 let drb_dir_path = inner.epoch_drb_result_dir_path();
1758 let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1759
1760 let mut result = Vec::new();
1761
1762 if !drb_dir_path.is_dir() {
1763 return Ok(Vec::new());
1764 }
1765 for (epoch, path) in epoch_files(drb_dir_path)? {
1766 let bytes =
1767 fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1768 let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1769 .context(format!("parsing epoch drb result {}", path.display()))?;
1770
1771 let block_header_path = block_header_dir_path
1772 .join(epoch.to_string())
1773 .with_extension("txt");
1774 let block_header = if block_header_path.is_file() {
1775 let bytes = fs::read(&block_header_path).context(format!(
1776 "reading epoch root block header {}",
1777 block_header_path.display()
1778 ))?;
1779 Some(
1780 bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1781 format!(
1782 "parsing epoch root block header {}",
1783 block_header_path.display()
1784 ),
1785 )?,
1786 )
1787 } else {
1788 None
1789 };
1790
1791 result.push(InitializerEpochInfo::<SeqTypes> {
1792 epoch,
1793 drb_result,
1794 block_header,
1795 });
1796 }
1797
1798 result.sort_by_key(|a| a.epoch);
1799
1800 let start = result
1802 .len()
1803 .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1804 let recent = result[start..].to_vec();
1805
1806 Ok(recent)
1807 }
1808
1809 async fn load_state_cert(
1810 &self,
1811 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1812 let inner = self.inner.read().await;
1813 let dir_path = inner.finalized_state_cert_dir_path();
1814
1815 if !dir_path.is_dir() {
1816 return Ok(None);
1817 }
1818
1819 let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1820
1821 for (epoch, path) in epoch_files(dir_path)? {
1822 if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1823 continue;
1824 }
1825 let bytes = fs::read(&path).context(format!(
1826 "reading light client state update certificate {}",
1827 path.display()
1828 ))?;
1829 let cert =
1830 bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1831 .or_else(|error| {
1832 tracing::info!(
1833 %error,
1834 path = %path.display(),
1835 "Failed to deserialize LightClientStateUpdateCertificateV2"
1836 );
1837
1838 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1839 &bytes,
1840 )
1841 .map(Into::into)
1842 .with_context(|| {
1843 format!(
1844 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1845 path.display()
1846 )
1847 })
1848 })?;
1849
1850 result = Some(cert);
1851 }
1852
1853 Ok(result)
1854 }
1855
1856 async fn get_state_cert_by_epoch(
1857 &self,
1858 epoch: u64,
1859 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1860 let inner = self.inner.read().await;
1861 let dir_path = inner.finalized_state_cert_dir_path();
1862
1863 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1864
1865 if !file_path.exists() {
1866 return Ok(None);
1867 }
1868
1869 let bytes = fs::read(&file_path).context(format!(
1870 "reading light client state update certificate {}",
1871 file_path.display()
1872 ))?;
1873
1874 let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1875 .or_else(|error| {
1876 tracing::info!(
1877 %error,
1878 path = %file_path.display(),
1879 "Failed to deserialize LightClientStateUpdateCertificateV2"
1880 );
1881
1882 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1883 .map(Into::into)
1884 .with_context(|| {
1885 format!(
1886 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1887 file_path.display()
1888 )
1889 })
1890 })?;
1891
1892 Ok(Some(cert))
1893 }
1894
1895 async fn insert_state_cert(
1896 &self,
1897 epoch: u64,
1898 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1899 ) -> anyhow::Result<()> {
1900 let inner = self.inner.read().await;
1901 let dir_path = inner.finalized_state_cert_dir_path();
1902
1903 fs::create_dir_all(&dir_path)
1904 .context(format!("creating state cert dir {}", dir_path.display()))?;
1905
1906 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1907 let bytes = bincode::serialize(&cert)
1908 .context("serializing light client state update certificate")?;
1909
1910 fs::write(&file_path, bytes).context(format!(
1911 "writing light client state update certificate {}",
1912 file_path.display()
1913 ))?;
1914
1915 Ok(())
1916 }
1917
1918 fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1919 }
1921}
1922
1923#[async_trait]
1924impl MembershipPersistence for Persistence {
1925 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
1926 let inner = self.inner.read().await;
1927 let path = &inner.stake_table_dir_path();
1928 let file_path = path.join(epoch.to_string()).with_extension("txt");
1929
1930 if !file_path.exists() {
1931 return Ok(None);
1932 }
1933
1934 let bytes = fs::read(&file_path).with_context(|| {
1935 format!("failed to read stake table file at {}", file_path.display())
1936 })?;
1937
1938 let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1939 format!(
1940 "failed to deserialize stake table at {}",
1941 file_path.display()
1942 )
1943 })?;
1944 Ok(Some(stake))
1945 }
1946
1947 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1948 let limit = limit as usize;
1949 let inner = self.inner.read().await;
1950 let path = &inner.stake_table_dir_path();
1951 let sorted_files = epoch_files(path)?
1952 .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1953 .take(limit);
1954 let mut validator_sets: Vec<IndexedStake> = Vec::new();
1955
1956 for (epoch, file_path) in sorted_files {
1957 let bytes = fs::read(&file_path).with_context(|| {
1958 format!("failed to read stake table file at {}", file_path.display())
1959 })?;
1960
1961 let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1962 format!(
1963 "failed to deserialize stake table at {}",
1964 file_path.display()
1965 )
1966 })?;
1967 validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1968 }
1969
1970 Ok(Some(validator_sets))
1971 }
1972
1973 async fn store_stake(
1974 &self,
1975 epoch: EpochNumber,
1976 stake: AuthenticatedValidatorMap,
1977 block_reward: Option<RewardAmount>,
1978 stake_table_hash: Option<StakeTableHash>,
1979 ) -> anyhow::Result<()> {
1980 let mut inner = self.inner.write().await;
1981 let dir_path = &inner.stake_table_dir_path();
1982
1983 fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1984
1985 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1986
1987 inner.replace(
1988 &file_path,
1989 |_| {
1990 Ok(true)
1992 },
1993 |mut file| {
1994 let data: StakeTuple = (stake, block_reward, stake_table_hash);
1995 let bytes =
1996 bincode::serialize(&data).context("serializing combined stake table")?;
1997 file.write_all(&bytes)?;
1998 Ok(())
1999 },
2000 )
2001 }
2002
2003 async fn store_events(
2005 &self,
2006 to_l1_block: u64,
2007 events: Vec<(EventKey, StakeTableEvent)>,
2008 ) -> anyhow::Result<()> {
2009 let mut inner = self.inner.write().await;
2010 let dir_path = &inner.stake_table_dir_path();
2011 let events_dir = dir_path.join("events");
2012
2013 fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
2014 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2016
2017 if last_l1_finalized_path.exists() {
2019 let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
2020 format!("Failed to read file at path: {last_l1_finalized_path:?}")
2021 })?;
2022 let mut buf = [0; 8];
2023 bytes
2024 .as_slice()
2025 .read_exact(&mut buf[..8])
2026 .with_context(|| {
2027 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2028 })?;
2029 let persisted_l1_block = u64::from_le_bytes(buf);
2030 if persisted_l1_block > to_l1_block {
2031 tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
2032 return Ok(());
2033 }
2034 }
2035
2036 for (event_key, event) in events {
2040 let (block_number, event_index) = event_key;
2041 let filename = format!("{block_number}_{event_index}");
2043 let file_path = events_dir.join(filename).with_extension("json");
2044
2045 if file_path.exists() {
2046 continue;
2047 }
2048
2049 inner
2050 .replace(
2051 &file_path,
2052 |_| Ok(true),
2053 |file| {
2054 let writer = BufWriter::new(file);
2055
2056 serde_json::to_writer_pretty(writer, &event)?;
2057 Ok(())
2058 },
2059 )
2060 .context("Failed to write event to file")?;
2061 }
2062
2063 inner.replace(
2065 &last_l1_finalized_path,
2066 |_| Ok(true),
2067 |mut file| {
2068 let bytes = to_l1_block.to_le_bytes();
2069
2070 file.write_all(&bytes)?;
2071 tracing::debug!("updated l1 finalized ={to_l1_block:?}");
2072 Ok(())
2073 },
2074 )
2075 }
2076
2077 async fn load_events(
2088 &self,
2089 from_l1_block: u64,
2090 to_l1_block: u64,
2091 ) -> anyhow::Result<(
2092 Option<EventsPersistenceRead>,
2093 Vec<(EventKey, StakeTableEvent)>,
2094 )> {
2095 let inner = self.inner.read().await;
2096 let dir_path = inner.stake_table_dir_path();
2097 let events_dir = dir_path.join("events");
2098
2099 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2102
2103 if !last_l1_finalized_path.exists() || !events_dir.exists() {
2104 return Ok((None, Vec::new()));
2105 }
2106
2107 let mut events = Vec::new();
2108
2109 let bytes = fs::read(&last_l1_finalized_path)
2110 .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
2111 let mut buf = [0; 8];
2112 bytes
2113 .as_slice()
2114 .read_exact(&mut buf[..8])
2115 .with_context(|| {
2116 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2117 })?;
2118
2119 let last_processed_l1_block = u64::from_le_bytes(buf);
2120
2121 let query_l1_block = if last_processed_l1_block > to_l1_block {
2125 to_l1_block
2126 } else {
2127 last_processed_l1_block
2128 };
2129
2130 for entry in fs::read_dir(&events_dir).context("events directory")? {
2131 let entry = entry?;
2132 let path = entry.path();
2133
2134 if !entry.file_type()?.is_file() {
2135 continue;
2136 }
2137
2138 if path
2139 .extension()
2140 .context(format!("extension for path={path:?}"))?
2141 != "json"
2142 {
2143 continue;
2144 }
2145
2146 let filename = path
2147 .file_stem()
2148 .and_then(|f| f.to_str())
2149 .unwrap_or_default();
2150
2151 let parts: Vec<&str> = filename.split('_').collect();
2152 if parts.len() != 2 {
2153 continue;
2154 }
2155
2156 let block_number = parts[0].parse::<u64>()?;
2157 let log_index = parts[1].parse::<u64>()?;
2158
2159 if block_number < from_l1_block || block_number > query_l1_block {
2160 continue;
2161 }
2162
2163 let file =
2164 File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2165 let reader = BufReader::new(file);
2166
2167 let event: StakeTableEvent = serde_json::from_reader(reader)
2168 .context(format!("Failed to deserialize event at path={path:?}"))?;
2169
2170 events.push(((block_number, log_index), event));
2171 }
2172
2173 events.sort_by_key(|(key, _)| *key);
2174
2175 if query_l1_block == to_l1_block {
2176 Ok((Some(EventsPersistenceRead::Complete), events))
2177 } else {
2178 Ok((
2179 Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2180 events,
2181 ))
2182 }
2183 }
2184
2185 async fn delete_stake_tables(&self) -> anyhow::Result<()> {
2186 let inner = self.inner.write().await;
2187 let events_dir = inner.stake_table_dir_path().join("events");
2188 if events_dir.exists() {
2189 fs::remove_dir_all(&events_dir)
2190 .with_context(|| format!("Failed to remove events dir: {events_dir:?}"))?;
2191 }
2192 let validators_dir = inner.stake_table_dir_path().join("validators");
2193 if validators_dir.exists() {
2194 fs::remove_dir_all(&validators_dir)
2195 .with_context(|| format!("Failed to remove validators dir: {validators_dir:?}"))?;
2196 }
2197 let drb_dir = inner.epoch_drb_result_dir_path();
2198 if drb_dir.exists() {
2199 fs::remove_dir_all(&drb_dir)
2200 .with_context(|| format!("Failed to remove epoch DRB result dir: {drb_dir:?}"))?;
2201 }
2202 Ok(())
2203 }
2204
2205 async fn store_all_validators(
2206 &self,
2207 epoch: EpochNumber,
2208 all_validators: RegisteredValidatorMap,
2209 ) -> anyhow::Result<()> {
2210 let mut inner = self.inner.write().await;
2211 let dir_path = inner.stake_table_dir_path();
2212 let validators_dir = dir_path.join("validators");
2213
2214 fs::create_dir_all(&validators_dir)
2216 .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2217
2218 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2220
2221 inner
2222 .replace(
2223 &file_path,
2224 |_| Ok(true),
2225 |file| {
2226 let writer = BufWriter::new(file);
2227
2228 serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2229 format!("Failed to serialize validators for epoch {epoch}")
2230 })?;
2231 Ok(())
2232 },
2233 )
2234 .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2235
2236 Ok(())
2237 }
2238
2239 async fn load_all_validators(
2240 &self,
2241 epoch: EpochNumber,
2242 offset: u64,
2243 limit: u64,
2244 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
2245 let inner = self.inner.read().await;
2246 let dir_path = inner.stake_table_dir_path();
2247 let validators_dir = dir_path.join("validators");
2248 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2249
2250 if !file_path.exists() {
2251 bail!("Validator file not found for epoch {epoch}");
2252 }
2253
2254 let file = File::open(&file_path)
2255 .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2256 let reader = BufReader::new(file);
2257
2258 let map: RegisteredValidatorMap = serde_json::from_reader(reader).with_context(|| {
2259 format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2260 })?;
2261
2262 let mut values: Vec<RegisteredValidator<PubKey>> = map.into_values().collect();
2263 values.sort_by_key(|v| v.account);
2264
2265 let start = offset as usize;
2266 let end = (start + limit as usize).min(values.len());
2267
2268 if start >= values.len() {
2269 return Ok(vec![]);
2270 }
2271
2272 Ok(values[start..end].to_vec())
2273 }
2274}
2275
2276#[async_trait]
2277impl DhtPersistentStorage for Persistence {
2278 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2284 let to_save =
2286 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2287
2288 let path = self.inner.read().await.libp2p_dht_path();
2290
2291 fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2293 .with_context(|| "failed to create directory")?;
2294
2295 let mut inner = self.inner.write().await;
2297
2298 inner
2300 .replace(
2301 &path,
2302 |_| {
2303 Ok(true)
2305 },
2306 |mut file| {
2307 file.write_all(&to_save)
2308 .with_context(|| "failed to write records to file")?;
2309 Ok(())
2310 },
2311 )
2312 .with_context(|| "failed to save records to file")?;
2313
2314 Ok(())
2315 }
2316
2317 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2323 let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2325 .with_context(|| "Failed to read records from file")?;
2326
2327 let records: Vec<SerializableRecord> =
2329 bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2330
2331 Ok(records)
2332 }
2333}
2334
2335fn view_files(
2337 dir: impl AsRef<Path>,
2338) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2339 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2340 let dir = dir.as_ref().display();
2341 let entry = entry.ok()?;
2342 if !entry.file_type().ok()?.is_file() {
2343 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2344 return None;
2345 }
2346 let path = entry.path();
2347 if path.extension()? != "txt" {
2348 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2349 return None;
2350 }
2351 let file_name = path.file_stem()?;
2352 let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2353 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2354 return None;
2355 };
2356 Some((ViewNumber::new(view_number), entry.path().to_owned()))
2357 }))
2358}
2359
2360fn epoch_files(
2363 dir: impl AsRef<Path>,
2364) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2365 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2366 let dir = dir.as_ref().display();
2367 let entry = entry.ok()?;
2368 if !entry.file_type().ok()?.is_file() {
2369 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2370 return None;
2371 }
2372 let path = entry.path();
2373 if path.extension()? != "txt" {
2374 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2375 return None;
2376 }
2377 let file_name = path.file_stem()?;
2378 let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2379 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2380 return None;
2381 };
2382 Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2383 }))
2384}
2385
2386#[cfg(test)]
2387mod test {
2388 use std::marker::PhantomData;
2389
2390 use committable::{Commitment, CommitmentBoundsArkless, Committable};
2391 use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2392 use hotshot::types::SignatureKey;
2393 use hotshot_example_types::node_types::TEST_VERSIONS;
2394 use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
2395 use hotshot_types::{
2396 data::QuorumProposal2,
2397 light_client::LightClientState,
2398 simple_certificate::QuorumCertificate,
2399 simple_vote::{QuorumData, Vote2Data},
2400 traits::{EncodeBytes, block_contents::GENESIS_VID_NUM_STORAGE_NODES},
2401 vid::advz::advz_scheme,
2402 };
2403 use jf_advz::VidScheme;
2404 use serde_json::json;
2405 use tempfile::TempDir;
2406
2407 use super::*;
2408 use crate::{BLSPubKey, persistence::tests::TestablePersistence};
2409
2410 #[async_trait]
2411 impl TestablePersistence for Persistence {
2412 type Storage = TempDir;
2413
2414 async fn tmp_storage() -> Self::Storage {
2415 TempDir::new().unwrap()
2416 }
2417
2418 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2419 Options::new(storage.path().into())
2420 }
2421 }
2422
2423 #[test]
2424 fn test_config_migrations_add_builder_urls() {
2425 let before = json!({
2426 "config": {
2427 "builder_url": "https://test:8080",
2428 "start_proposing_view": 1,
2429 "stop_proposing_view": 2,
2430 "start_voting_view": 1,
2431 "stop_voting_view": 2,
2432 "start_proposing_time": 1,
2433 "stop_proposing_time": 2,
2434 "start_voting_time": 1,
2435 "stop_voting_time": 2
2436 }
2437 });
2438 let after = json!({
2439 "config": {
2440 "builder_urls": ["https://test:8080"],
2441 "start_proposing_view": 1,
2442 "stop_proposing_view": 2,
2443 "start_voting_view": 1,
2444 "stop_voting_view": 2,
2445 "start_proposing_time": 1,
2446 "stop_proposing_time": 2,
2447 "start_voting_time": 1,
2448 "stop_voting_time": 2,
2449 "epoch_height": 0,
2450 "drb_difficulty": 0,
2451 "drb_upgrade_difficulty": 0,
2452 "da_committees": [],
2453 }
2454 });
2455
2456 assert_eq!(migrate_network_config(before).unwrap(), after);
2457 }
2458
2459 #[test]
2460 fn test_config_migrations_existing_builder_urls() {
2461 let before = json!({
2462 "config": {
2463 "builder_urls": ["https://test:8080", "https://test:8081"],
2464 "start_proposing_view": 1,
2465 "stop_proposing_view": 2,
2466 "start_voting_view": 1,
2467 "stop_voting_view": 2,
2468 "start_proposing_time": 1,
2469 "stop_proposing_time": 2,
2470 "start_voting_time": 1,
2471 "stop_voting_time": 2,
2472 "epoch_height": 0,
2473 "drb_difficulty": 0,
2474 "drb_upgrade_difficulty": 0,
2475 "da_committees": [],
2476 }
2477 });
2478
2479 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2480 }
2481
2482 #[test]
2483 fn test_config_migrations_add_upgrade_params() {
2484 let before = json!({
2485 "config": {
2486 "builder_urls": ["https://test:8080", "https://test:8081"]
2487 }
2488 });
2489 let after = json!({
2490 "config": {
2491 "builder_urls": ["https://test:8080", "https://test:8081"],
2492 "start_proposing_view": 9007199254740991u64,
2493 "stop_proposing_view": 0,
2494 "start_voting_view": 9007199254740991u64,
2495 "stop_voting_view": 0,
2496 "start_proposing_time": 9007199254740991u64,
2497 "stop_proposing_time": 0,
2498 "start_voting_time": 9007199254740991u64,
2499 "stop_voting_time": 0,
2500 "epoch_height": 0,
2501 "drb_difficulty": 0,
2502 "drb_upgrade_difficulty": 0,
2503 "da_committees": [],
2504 }
2505 });
2506
2507 assert_eq!(migrate_network_config(before).unwrap(), after);
2508 }
2509
2510 #[test]
2511 fn test_config_migrations_existing_upgrade_params() {
2512 let before = json!({
2513 "config": {
2514 "builder_urls": ["https://test:8080", "https://test:8081"],
2515 "start_proposing_view": 1,
2516 "stop_proposing_view": 2,
2517 "start_voting_view": 1,
2518 "stop_voting_view": 2,
2519 "start_proposing_time": 1,
2520 "stop_proposing_time": 2,
2521 "start_voting_time": 1,
2522 "stop_voting_time": 2,
2523 "epoch_height": 0,
2524 "drb_difficulty": 0,
2525 "drb_upgrade_difficulty": 0,
2526 "da_committees": [],
2527 }
2528 });
2529
2530 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2531 }
2532
2533 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2534 pub async fn test_consensus_migration() {
2535 let rows = 300;
2536 let tmp = Persistence::tmp_storage().await;
2537 let mut opt = Persistence::options(&tmp);
2538 let storage = opt.create().await.unwrap();
2539
2540 let inner = storage.inner.read().await;
2541
2542 let decided_leaves_path = inner.decided_leaf_path();
2543 fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2544
2545 let qp_dir_path = inner.quorum_proposals_dir_path();
2546 fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2547
2548 let state_cert_dir_path = inner.state_cert_dir_path();
2549 fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2550 drop(inner);
2551
2552 assert!(storage.load_state_cert().await.unwrap().is_none());
2553
2554 for i in 0..rows {
2555 let view = ViewNumber::new(i);
2556 let validated_state = ValidatedState::default();
2557 let instance_state = NodeState::default();
2558
2559 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2560 let (payload, metadata) =
2561 Payload::from_transactions([], &validated_state, &instance_state)
2562 .await
2563 .unwrap();
2564
2565 let payload_bytes = payload.encode();
2566
2567 let block_header = Header::genesis(
2568 &instance_state,
2569 payload.clone(),
2570 &metadata,
2571 TEST_VERSIONS.test.base,
2572 );
2573
2574 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2575 epoch: EpochNumber::new(i),
2576 light_client_state: LightClientState {
2577 view_number: i,
2578 block_height: i,
2579 block_comm_root: Default::default(),
2580 },
2581 next_stake_table_state: Default::default(),
2582 signatures: vec![], auth_root: Default::default(),
2584 };
2585 assert!(storage.add_state_cert(state_cert).await.is_ok());
2586
2587 let null_quorum_data = QuorumData {
2588 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2589 };
2590
2591 let justify_qc = QuorumCertificate::new(
2592 null_quorum_data.clone(),
2593 null_quorum_data.commit(),
2594 view,
2595 None,
2596 PhantomData,
2597 );
2598
2599 let quorum_proposal = QuorumProposal {
2600 block_header,
2601 view_number: view,
2602 justify_qc: justify_qc.clone(),
2603 upgrade_certificate: None,
2604 proposal_certificate: None,
2605 };
2606
2607 let quorum_proposal_signature =
2608 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2609 .expect("Failed to sign quorum proposal");
2610
2611 let proposal = Proposal {
2612 data: quorum_proposal.clone(),
2613 signature: quorum_proposal_signature,
2614 _pd: PhantomData::<SeqTypes>,
2615 };
2616
2617 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2618 leaf.fill_block_payload(
2619 payload,
2620 GENESIS_VID_NUM_STORAGE_NODES,
2621 TEST_VERSIONS.test.base,
2622 )
2623 .unwrap();
2624
2625 let mut inner = storage.inner.write().await;
2626
2627 tracing::debug!("inserting decided leaves");
2628 let file_path = decided_leaves_path
2629 .join(view.to_string())
2630 .with_extension("txt");
2631
2632 tracing::debug!("inserting decided leaves");
2633
2634 inner
2635 .replace(
2636 &file_path,
2637 |_| Ok(true),
2638 |mut file| {
2639 let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2640 file.write_all(&bytes)?;
2641 Ok(())
2642 },
2643 )
2644 .expect("replace decided leaves");
2645
2646 let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2647
2648 tracing::debug!("inserting qc for {view}");
2649
2650 inner
2651 .replace(
2652 &file_path,
2653 |_| Ok(true),
2654 |mut file| {
2655 let proposal_bytes =
2656 bincode::serialize(&proposal).context("serialize proposal")?;
2657
2658 file.write_all(&proposal_bytes)?;
2659 Ok(())
2660 },
2661 )
2662 .unwrap();
2663
2664 drop(inner);
2665 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2666 .disperse(payload_bytes.clone())
2667 .unwrap();
2668
2669 let vid = VidDisperseShare0::<SeqTypes> {
2670 view_number: ViewNumber::new(i),
2671 payload_commitment: Default::default(),
2672 share: disperse.shares[0].clone(),
2673 common: disperse.common,
2674 recipient_key: pubkey,
2675 };
2676
2677 let (payload, metadata) =
2678 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2679 .await
2680 .unwrap();
2681
2682 let da = DaProposal::<SeqTypes> {
2683 encoded_transactions: payload.encode(),
2684 metadata,
2685 view_number: ViewNumber::new(i),
2686 };
2687
2688 let block_payload_signature =
2689 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2690
2691 let da_proposal = Proposal {
2692 data: da,
2693 signature: block_payload_signature,
2694 _pd: Default::default(),
2695 };
2696
2697 tracing::debug!("inserting vid for {view}");
2698 storage
2699 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2700 .await
2701 .unwrap();
2702
2703 tracing::debug!("inserting da for {view}");
2704 storage
2705 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2706 .await
2707 .unwrap();
2708 }
2709
2710 storage.migrate_storage().await.unwrap();
2711 let inner = storage.inner.read().await;
2712 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2713 let decided_leaves_count = decided_leaves
2714 .filter_map(Result::ok)
2715 .filter(|e| e.path().is_file())
2716 .count();
2717 assert_eq!(
2718 decided_leaves_count, rows as usize,
2719 "decided leaves count does not match",
2720 );
2721
2722 let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2723 let da_proposals_count = da_proposals
2724 .filter_map(Result::ok)
2725 .filter(|e| e.path().is_file())
2726 .count();
2727 assert_eq!(
2728 da_proposals_count, rows as usize,
2729 "da proposals does not match",
2730 );
2731
2732 let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2733 let vids_count = vids
2734 .filter_map(Result::ok)
2735 .filter(|e| e.path().is_file())
2736 .count();
2737 assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2738
2739 let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2740 let qps_count = qps
2741 .filter_map(Result::ok)
2742 .filter(|e| e.path().is_file())
2743 .count();
2744 assert_eq!(
2745 qps_count, rows as usize,
2746 "quorum proposals count does not match",
2747 );
2748
2749 let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2750 let state_cert_count = state_certs
2751 .filter_map(Result::ok)
2752 .filter(|e| e.path().is_file())
2753 .count();
2754 assert_eq!(
2755 state_cert_count, rows as usize,
2756 "light client state update certificate count does not match",
2757 );
2758
2759 let storage = opt.create().await.unwrap();
2763 storage.migrate_storage().await.unwrap();
2764
2765 let inner = storage.inner.read().await;
2766 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2767 let decided_leaves_count = decided_leaves
2768 .filter_map(Result::ok)
2769 .filter(|e| e.path().is_file())
2770 .count();
2771 assert_eq!(
2772 decided_leaves_count, rows as usize,
2773 "decided leaves count does not match",
2774 );
2775 }
2776
2777 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2778 async fn test_load_quorum_proposals_invalid_extension() {
2779 let tmp = Persistence::tmp_storage().await;
2780 let storage = Persistence::connect(&tmp).await;
2781
2782 let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2784 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2785 let signature = PubKey::sign(&privkey, &[]).unwrap();
2786 let mut quorum_proposal = Proposal {
2787 data: QuorumProposalWrapper::<SeqTypes> {
2788 proposal: QuorumProposal2::<SeqTypes> {
2789 epoch: None,
2790 block_header: leaf.block_header().clone(),
2791 view_number: ViewNumber::genesis(),
2792 justify_qc: QuorumCertificate2::genesis(
2793 &Default::default(),
2794 &NodeState::mock(),
2795 TEST_VERSIONS.test,
2796 )
2797 .await,
2798 upgrade_certificate: None,
2799 view_change_evidence: None,
2800 next_drb_result: None,
2801 next_epoch_justify_qc: None,
2802 state_cert: None,
2803 },
2804 },
2805 signature,
2806 _pd: Default::default(),
2807 };
2808
2809 let quorum_proposal1 = quorum_proposal.clone();
2811 storage
2812 .append_quorum_proposal2(&quorum_proposal1)
2813 .await
2814 .unwrap();
2815 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2816 let quorum_proposal2 = quorum_proposal.clone();
2817 storage
2818 .append_quorum_proposal2(&quorum_proposal2)
2819 .await
2820 .unwrap();
2821
2822 fs::rename(
2825 tmp.path().join("quorum_proposals2/1.txt"),
2826 tmp.path().join("quorum_proposals2/1.swp"),
2827 )
2828 .unwrap();
2829
2830 assert_eq!(
2832 storage.load_quorum_proposals().await.unwrap(),
2833 [(ViewNumber::genesis(), quorum_proposal1)]
2834 .into_iter()
2835 .collect::<BTreeMap<_, _>>()
2836 );
2837 }
2838
2839 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2840 async fn test_cert2_persisted_as_bin() {
2841 let tmp = Persistence::tmp_storage().await;
2842 let storage = Persistence::connect(&tmp).await;
2843 let view = ViewNumber::new(7);
2844 let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2845 let data = Vote2Data {
2846 leaf_commit: leaf.commit(),
2847 epoch: EpochNumber::new(1),
2848 block_number: leaf.height(),
2849 };
2850 let cert2 = Certificate2::new(data.clone(), data.commit(), view, None, PhantomData);
2851
2852 storage.append_cert2(view, cert2.clone()).await.unwrap();
2853
2854 assert!(tmp.path().join("decided_cert2/7.bin").is_file());
2855 assert!(!tmp.path().join("decided_cert2/7.txt").exists());
2856 assert_eq!(storage.load_cert2(view).await.unwrap(), Some(cert2));
2857 }
2858
2859 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2860 async fn test_load_quorum_proposals_malformed_data() {
2861 let tmp = Persistence::tmp_storage().await;
2862 let storage = Persistence::connect(&tmp).await;
2863
2864 let leaf: Leaf2 = Leaf::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base)
2866 .await
2867 .into();
2868 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2869 let signature = PubKey::sign(&privkey, &[]).unwrap();
2870 let quorum_proposal = Proposal {
2871 data: QuorumProposalWrapper::<SeqTypes> {
2872 proposal: QuorumProposal2::<SeqTypes> {
2873 epoch: None,
2874 block_header: leaf.block_header().clone(),
2875 view_number: ViewNumber::new(1),
2876 justify_qc: QuorumCertificate2::genesis(
2877 &Default::default(),
2878 &NodeState::mock(),
2879 TEST_VERSIONS.test,
2880 )
2881 .await,
2882 upgrade_certificate: None,
2883 view_change_evidence: None,
2884 next_drb_result: None,
2885 next_epoch_justify_qc: None,
2886 state_cert: None,
2887 },
2888 },
2889 signature,
2890 _pd: Default::default(),
2891 };
2892
2893 fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2895 fs::write(
2896 tmp.path().join("quorum_proposals2/0.txt"),
2897 "invalid data".as_bytes(),
2898 )
2899 .unwrap();
2900
2901 storage
2903 .append_quorum_proposal2(&quorum_proposal)
2904 .await
2905 .unwrap();
2906
2907 assert_eq!(
2909 storage.load_quorum_proposals().await.unwrap(),
2910 [(ViewNumber::new(1), quorum_proposal)]
2911 .into_iter()
2912 .collect::<BTreeMap<_, _>>()
2913 );
2914 }
2915
2916 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2917 async fn test_store_events_empty() {
2918 let tmp = Persistence::tmp_storage().await;
2919 let mut opt = Persistence::options(&tmp);
2920 let storage = opt.create().await.unwrap();
2921
2922 assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
2923
2924 for i in 1..=2 {
2926 tracing::info!(i, "update l1 height");
2927 storage.store_events(i, vec![]).await.unwrap();
2928 assert_eq!(
2929 storage.load_events(0, 100).await.unwrap(),
2930 (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
2931 );
2932 }
2933 }
2934
2935 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2936 async fn test_migrate_x25519_keys() {
2937 use std::collections::HashMap;
2938
2939 use alloy::primitives::{Address, U256};
2940 use indexmap::IndexMap;
2941
2942 use crate::persistence::RegisteredValidatorNoX25519;
2943
2944 let tmp = Persistence::tmp_storage().await;
2945 let mut opt = Persistence::options(&tmp);
2946 let storage = opt.create().await.unwrap();
2947
2948 let addr = Address::random();
2949 let legacy_validator = RegisteredValidatorNoX25519 {
2950 account: addr,
2951 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2952 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2953 stake: U256::from(1000),
2954 commission: 100,
2955 delegators: HashMap::new(),
2956 authenticated: true,
2957 };
2958
2959 let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2961 legacy_map.insert(addr, legacy_validator);
2962
2963 type LegacyTuple = (
2964 IndexMap<Address, RegisteredValidatorNoX25519>,
2965 Option<RewardAmount>,
2966 Option<StakeTableHash>,
2967 );
2968 let legacy_data: LegacyTuple = (legacy_map, None, None);
2969 let bytes = bincode::serialize(&legacy_data).unwrap();
2970
2971 let inner = storage.inner.read().await;
2972 let path = inner.stake_table_dir_path();
2973 drop(inner);
2974 fs::create_dir_all(&path).unwrap();
2975 fs::write(path.join("1.txt"), &bytes).unwrap();
2976
2977 let json_validator = RegisteredValidatorNoX25519 {
2979 account: addr,
2980 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2981 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2982 stake: U256::from(2000),
2983 commission: 200,
2984 delegators: HashMap::new(),
2985 authenticated: true,
2986 };
2987 let mut json_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2988 json_map.insert(addr, json_validator);
2989 let validators_dir = path.join("validators");
2990 fs::create_dir_all(&validators_dir).unwrap();
2991 let json_path = validators_dir.join("epoch_1.json");
2992 fs::write(&json_path, serde_json::to_string_pretty(&json_map).unwrap()).unwrap();
2993
2994 storage.migrate_x25519_keys().await.unwrap();
2996
2997 let result = storage.load_stake(EpochNumber::new(1)).await.unwrap();
2999 assert!(result.is_some());
3000 let (validators, reward, hash) = result.unwrap();
3001 assert_eq!(validators.len(), 1);
3002 let v = validators.get(&addr).unwrap();
3003 assert_eq!(v.stake, U256::from(1000));
3004 assert!(v.x25519_key.is_none());
3005 assert!(v.p2p_addr.is_none());
3006 assert!(reward.is_none());
3007 assert!(hash.is_none());
3008
3009 let json_content = fs::read_to_string(&json_path).unwrap();
3011 let parsed: espresso_types::RegisteredValidatorMap =
3012 serde_json::from_str(&json_content).unwrap();
3013 assert_eq!(parsed.len(), 1);
3014 let json_v = parsed.get(&addr).unwrap();
3015 assert_eq!(json_v.stake, U256::from(2000));
3016 assert!(json_v.x25519_key.is_none());
3017
3018 storage.migrate_x25519_keys().await.unwrap();
3020 let result2 = storage.load_stake(EpochNumber::new(1)).await;
3021 assert!(result2.is_ok());
3022 }
3023
3024 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3025 async fn test_migrate_x25519_keys_no_stake_dir() {
3026 let tmp = Persistence::tmp_storage().await;
3027 let mut opt = Persistence::options(&tmp);
3028 let storage = opt.create().await.unwrap();
3029
3030 storage.migrate_x25519_keys().await.unwrap();
3032
3033 let inner = storage.inner.read().await;
3035 let path = inner.stake_table_dir_path();
3036 drop(inner);
3037 fs::create_dir_all(&path).unwrap();
3038 fs::write(path.join("1.txt"), b"garbage").unwrap();
3039
3040 storage.migrate_x25519_keys().await.unwrap();
3042 }
3043
3044 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3045 async fn test_store_all_validators_authenticated_and_unauthenticated() {
3046 use std::collections::HashMap;
3047
3048 use alloy::primitives::{Address, U256};
3049 use espresso_types::v0_3::RegisteredValidator;
3050 use indexmap::IndexMap;
3051
3052 let tmp = Persistence::tmp_storage().await;
3053 let mut opt = Persistence::options(&tmp);
3054 let storage = opt.create().await.unwrap();
3055
3056 let authenticated_validator = RegisteredValidator {
3058 account: Address::random(),
3059 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3060 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
3061 stake: U256::from(1000),
3062 commission: 100,
3063 delegators: HashMap::new(),
3064 authenticated: true,
3065 x25519_key: None,
3066 p2p_addr: None,
3067 };
3068
3069 let unauthenticated_validator = RegisteredValidator {
3071 account: Address::random(),
3072 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3073 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
3074 stake: U256::from(2000),
3075 commission: 200,
3076 delegators: HashMap::new(),
3077 authenticated: false,
3078 x25519_key: None,
3079 p2p_addr: None,
3080 };
3081
3082 let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3083 validators.insert(
3084 authenticated_validator.account,
3085 authenticated_validator.clone(),
3086 );
3087 validators.insert(
3088 unauthenticated_validator.account,
3089 unauthenticated_validator.clone(),
3090 );
3091
3092 storage
3094 .store_all_validators(EpochNumber::new(1), validators)
3095 .await
3096 .unwrap();
3097
3098 let loaded = storage
3100 .load_all_validators(EpochNumber::new(1), 0, 100)
3101 .await
3102 .unwrap();
3103 assert_eq!(loaded.len(), 2);
3104
3105 let loaded_auth = loaded
3107 .iter()
3108 .find(|v| v.account == authenticated_validator.account)
3109 .unwrap();
3110 assert!(
3111 loaded_auth.authenticated,
3112 "authenticated validator should remain authenticated"
3113 );
3114
3115 let loaded_unauth = loaded
3116 .iter()
3117 .find(|v| v.account == unauthenticated_validator.account)
3118 .unwrap();
3119 assert!(
3120 !loaded_unauth.authenticated,
3121 "unauthenticated validator should remain unauthenticated"
3122 );
3123 }
3124}