1use std::{
2 collections::{BTreeMap, HashSet},
3 fs::{self, File, OpenOptions},
4 io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5 ops::RangeInclusive,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Instant,
9};
10
11use alloy::primitives::Address;
12use anyhow::{Context, anyhow, bail};
13use async_lock::RwLock;
14use async_trait::async_trait;
15use clap::Parser;
16use espresso_types::{
17 AuthenticatedValidatorMap, Leaf, Leaf2, NetworkConfig, Payload, PubKey, RegisteredValidatorMap,
18 SeqTypes, StakeTableHash,
19 traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
20 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
21 v0_3::{
22 AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
23 StakeTableEvent,
24 },
25};
26use hotshot::InitializerEpochInfo;
27use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
28 DhtPersistentStorage, SerializableRecord,
29};
30use hotshot_types::{
31 data::{
32 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
33 QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
34 },
35 drb::{DrbInput, DrbResult},
36 event::{Event, EventType, HotShotAction, LeafInfo},
37 message::{Proposal, convert_proposal},
38 simple_certificate::{
39 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
40 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
41 },
42 traits::{
43 block_contents::{BlockHeader, BlockPayload},
44 metrics::Metrics,
45 node_implementation::NodeType,
46 },
47 vote::HasViewNumber,
48};
49use itertools::Itertools;
50
51use super::RegisteredValidatorNoX25519;
52use crate::{
53 RECENT_STAKE_TABLES_LIMIT, ViewNumber,
54 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
55};
56
57fn deserialize_stake_table(bytes: &[u8]) -> anyhow::Result<(StakeTuple, bool)> {
60 if let Ok(stake) = bincode::deserialize::<StakeTuple>(bytes) {
62 return Ok((stake, false));
63 }
64
65 type LegacyMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
67 type LegacyTuple = (LegacyMap, Option<RewardAmount>, Option<StakeTableHash>);
68 let legacy: LegacyTuple = bincode::deserialize(bytes)
69 .context("failed to deserialize stake table (tried current and legacy formats)")?;
70 let migrated: AuthenticatedValidatorMap = legacy
71 .0
72 .into_iter()
73 .map(|(addr, v)| {
74 let registered = v.migrate();
75 (
76 addr,
77 AuthenticatedValidator::try_from(registered)
78 .expect("stake tables only contain authenticated validators"),
79 )
80 })
81 .collect();
82 Ok(((migrated, legacy.1, legacy.2), true))
83}
84
85#[derive(Parser, Clone, Debug)]
87pub struct Options {
88 #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
90 path: PathBuf,
91
92 #[clap(
104 long,
105 env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
106 default_value = "130000"
107 )]
108 pub(crate) consensus_view_retention: u64,
109}
110
111impl Default for Options {
112 fn default() -> Self {
113 Self::parse_from(std::iter::empty::<String>())
114 }
115}
116
117impl Options {
118 pub fn new(path: PathBuf) -> Self {
119 Self {
120 path,
121 consensus_view_retention: 130000,
122 }
123 }
124
125 pub(crate) fn path(&self) -> &Path {
126 &self.path
127 }
128}
129
130#[async_trait]
131impl PersistenceOptions for Options {
132 type Persistence = Persistence;
133
134 fn set_view_retention(&mut self, view_retention: u64) {
135 self.consensus_view_retention = view_retention;
136 }
137
138 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
139 let path = self.path.clone();
140 let view_retention = self.consensus_view_retention;
141
142 let migration_path = path.join("migration");
143 let migrated = if migration_path.is_file() {
144 let bytes = fs::read(&migration_path).context(format!(
145 "unable to read migration from {}",
146 migration_path.display()
147 ))?;
148 bincode::deserialize(&bytes).context("malformed migration file")?
149 } else {
150 HashSet::new()
151 };
152
153 Ok(Persistence {
154 inner: Arc::new(RwLock::new(Inner {
155 path,
156 migrated,
157 view_retention,
158 })),
159 metrics: Arc::new(PersistenceMetricsValue::default()),
160 })
161 }
162
163 async fn reset(self) -> anyhow::Result<()> {
164 todo!()
165 }
166}
167
168#[derive(Clone, Debug)]
170pub struct Persistence {
171 inner: Arc<RwLock<Inner>>,
175 metrics: Arc<PersistenceMetricsValue>,
177}
178
179#[derive(Debug)]
180struct Inner {
181 path: PathBuf,
182 view_retention: u64,
183 migrated: HashSet<String>,
184}
185
186impl Inner {
187 fn config_path(&self) -> PathBuf {
188 self.path.join("hotshot.cfg")
189 }
190
191 fn migration(&self) -> PathBuf {
192 self.path.join("migration")
193 }
194
195 fn voted_view_path(&self) -> PathBuf {
196 self.path.join("highest_voted_view")
197 }
198
199 fn restart_view_path(&self) -> PathBuf {
200 self.path.join("restart_view")
201 }
202
203 fn decided_leaf_path(&self) -> PathBuf {
205 self.path.join("decided_leaves")
206 }
207
208 fn decided_leaf2_path(&self) -> PathBuf {
209 self.path.join("decided_leaves2")
210 }
211
212 fn legacy_anchor_leaf_path(&self) -> PathBuf {
214 self.path.join("anchor_leaf")
215 }
216
217 fn vid_dir_path(&self) -> PathBuf {
218 self.path.join("vid")
219 }
220
221 fn vid2_dir_path(&self) -> PathBuf {
222 self.path.join("vid2")
223 }
224
225 fn da_dir_path(&self) -> PathBuf {
226 self.path.join("da")
227 }
228
229 fn drb_dir_path(&self) -> PathBuf {
230 self.path.join("drb")
231 }
232
233 fn da2_dir_path(&self) -> PathBuf {
234 self.path.join("da2")
235 }
236
237 fn quorum_proposals_dir_path(&self) -> PathBuf {
238 self.path.join("quorum_proposals")
239 }
240
241 fn quorum_proposals2_dir_path(&self) -> PathBuf {
242 self.path.join("quorum_proposals2")
243 }
244
245 fn upgrade_certificate_dir_path(&self) -> PathBuf {
246 self.path.join("upgrade_certificate")
247 }
248
249 fn stake_table_dir_path(&self) -> PathBuf {
250 self.path.join("stake_table")
251 }
252
253 fn next_epoch_qc(&self) -> PathBuf {
254 self.path.join("next_epoch_quorum_certificate")
255 }
256
257 fn eqc(&self) -> PathBuf {
258 self.path.join("eqc")
259 }
260
261 fn libp2p_dht_path(&self) -> PathBuf {
262 self.path.join("libp2p_dht")
263 }
264 fn epoch_drb_result_dir_path(&self) -> PathBuf {
265 self.path.join("epoch_drb_result")
266 }
267
268 fn epoch_root_block_header_dir_path(&self) -> PathBuf {
269 self.path.join("epoch_root_block_header")
270 }
271
272 fn finalized_state_cert_dir_path(&self) -> PathBuf {
273 self.path.join("finalized_state_cert")
274 }
275
276 fn state_cert_dir_path(&self) -> PathBuf {
277 self.path.join("state_cert")
278 }
279
280 fn update_migration(&mut self) -> anyhow::Result<()> {
281 let path = self.migration();
282 let bytes = bincode::serialize(&self.migrated)?;
283
284 self.replace(
285 &path,
286 |_| Ok(true),
287 |mut file| {
288 file.write_all(&bytes)?;
289 Ok(())
290 },
291 )
292 }
293
294 fn replace(
304 &mut self,
305 path: &Path,
306 pred: impl FnOnce(File) -> anyhow::Result<bool>,
307 write: impl FnOnce(File) -> anyhow::Result<()>,
308 ) -> anyhow::Result<()> {
309 if path.is_file() {
310 if !pred(File::open(path)?)? {
315 return Ok(());
318 }
319 }
320
321 let mut swap_path = path.to_owned();
324 swap_path.set_extension("swp");
325 let swap = OpenOptions::new()
326 .write(true)
327 .truncate(true)
328 .create(true)
329 .open(&swap_path)?;
330 write(swap)?;
331
332 fs::rename(swap_path, path)?;
334
335 Ok(())
336 }
337
338 fn collect_garbage(
339 &mut self,
340 decided_view: ViewNumber,
341 prune_intervals: &[RangeInclusive<ViewNumber>],
342 ) -> anyhow::Result<()> {
343 let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
344
345 self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
346 self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
347 self.prune_files(
348 self.quorum_proposals2_dir_path(),
349 prune_view,
350 None,
351 prune_intervals,
352 )?;
353 self.prune_files(
354 self.state_cert_dir_path(),
355 prune_view,
356 None,
357 prune_intervals,
358 )?;
359
360 self.prune_files(
362 self.decided_leaf2_path(),
363 prune_view,
364 Some(decided_view),
365 prune_intervals,
366 )?;
367
368 Ok(())
369 }
370
371 fn prune_files(
372 &mut self,
373 dir_path: PathBuf,
374 prune_view: ViewNumber,
375 keep_decided_view: Option<ViewNumber>,
376 prune_intervals: &[RangeInclusive<ViewNumber>],
377 ) -> anyhow::Result<()> {
378 if !dir_path.is_dir() {
379 return Ok(());
380 }
381
382 for (file_view, path) in view_files(dir_path)? {
383 if let Some(decided_view) = keep_decided_view
385 && decided_view == file_view
386 {
387 continue;
388 }
389 if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
393 fs::remove_file(&path)?;
394 }
395 }
396
397 Ok(())
398 }
399
400 fn parse_decided_leaf(
401 &self,
402 bytes: &[u8],
403 ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
404 match bincode::deserialize(bytes) {
408 Ok((leaf, cert)) => Ok((leaf, cert)),
409 Err(err) => {
410 tracing::info!(
411 "error parsing decided leaf, maybe file was created without next epoch QC? \
412 {err}"
413 );
414 let (leaf, qc) =
415 bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
416 .context("parsing decided leaf")?;
417 Ok((leaf, CertificatePair::non_epoch_change(qc)))
418 },
419 }
420 }
421
422 async fn generate_decide_events(
427 &mut self,
428 view: ViewNumber,
429 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
430 consumer: &impl EventConsumer,
431 ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
432 let mut leaves = BTreeMap::new();
436 for (v, path) in view_files(self.decided_leaf2_path())? {
437 if v > view {
438 continue;
439 }
440
441 let bytes =
442 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
443 let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
444
445 let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
447 if vid_share.is_none() {
448 tracing::debug!(?v, "VID share not available at decide");
449 }
450
451 let state_cert = self.store_finalized_state_cert(v)?;
453
454 if let Some(proposal) = self.load_da_proposal(v)? {
456 let payload = Payload::from_bytes(
457 &proposal.data.encoded_transactions,
458 &proposal.data.metadata,
459 );
460 leaf.fill_block_payload_unchecked(payload);
461 } else {
462 tracing::debug!(?v, "DA proposal not available at decide");
463 }
464
465 let info = LeafInfo {
466 leaf,
467 vid_share,
468 state_cert,
469 state: Default::default(),
472 delta: Default::default(),
473 };
474
475 leaves.insert(v, (info, cert));
476 }
477
478 if let Some((oldest_view, _)) = leaves.first_key_value() {
482 if *oldest_view > ViewNumber::genesis() {
485 leaves.pop_first();
486 }
487 }
488
489 let mut intervals = vec![];
490 let mut current_interval = None;
491 for (view, (leaf, cert)) in leaves {
492 let height = leaf.leaf.block_header().block_number();
493
494 {
495 let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
498 (deciding_qc.view_number() == cert.view_number() + 1)
499 .then_some(deciding_qc.clone())
500 } else {
501 None
502 };
503
504 consumer
505 .handle_event(&Event {
506 view_number: view,
507 event: EventType::Decide {
508 committing_qc: Arc::new(cert),
509 deciding_qc,
510 leaf_chain: Arc::new(vec![leaf]),
511 block_size: None,
512 },
513 })
514 .await?;
515 }
516 if let Some((start, end, current_height)) = current_interval.as_mut() {
517 if height == *current_height + 1 {
518 *current_height += 1;
521 *end = view;
522 } else {
523 intervals.push(*start..=*end);
525 current_interval = Some((view, view, height));
526 }
527 } else {
528 current_interval = Some((view, view, height));
530 }
531 }
532 if let Some((start, end, _)) = current_interval {
533 intervals.push(start..=end);
534 }
535
536 Ok(intervals)
537 }
538
539 fn load_da_proposal(
540 &self,
541 view: ViewNumber,
542 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
543 let dir_path = self.da2_dir_path();
544
545 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
546
547 if !file_path.exists() {
548 return Ok(None);
549 }
550
551 let da_bytes = fs::read(file_path)?;
552
553 let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
554 bincode::deserialize(&da_bytes)?;
555 Ok(Some(da_proposal))
556 }
557
558 fn load_vid_share(
559 &self,
560 view: ViewNumber,
561 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
562 let dir_path = self.vid2_dir_path();
563
564 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
565
566 if !file_path.exists() {
567 return Ok(None);
568 }
569
570 let vid_share_bytes = fs::read(file_path)?;
571 let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
572 bincode::deserialize(&vid_share_bytes)?;
573 Ok(Some(vid_share))
574 }
575
576 fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
577 tracing::info!("Checking `Leaf2` to load the anchor leaf.");
578 if self.decided_leaf2_path().is_dir() {
579 let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
580
581 for (_, path) in view_files(self.decided_leaf2_path())? {
583 let bytes =
584 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
585 let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
586 if let Some((anchor_leaf, _)) = &anchor {
587 if leaf.view_number() > anchor_leaf.view_number() {
588 anchor = Some((leaf, cert.qc().clone()));
589 }
590 } else {
591 anchor = Some((leaf, cert.qc().clone()));
592 }
593 }
594
595 return Ok(anchor);
596 }
597
598 tracing::warn!(
599 "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
600 This is very likely to fail."
601 );
602 if self.legacy_anchor_leaf_path().is_file() {
603 let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
606
607 file.seek(SeekFrom::Start(8)).context("seek")?;
609 let bytes = file
610 .bytes()
611 .collect::<Result<Vec<_>, _>>()
612 .context("read")?;
613 return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
614 }
615
616 Ok(None)
617 }
618
619 fn store_finalized_state_cert(
620 &mut self,
621 view: ViewNumber,
622 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
623 let dir_path = self.state_cert_dir_path();
624 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
625
626 if !file_path.exists() {
627 return Ok(None);
628 }
629
630 let bytes = fs::read(&file_path)?;
631
632 let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
633 bincode::deserialize(&bytes).or_else(|err_v2| {
634 tracing::info!(
635 error = %err_v2,
636 path = %file_path.display(),
637 "Failed to deserialize state certificate, attempting with v1"
638 );
639
640 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
641 .map(Into::into)
642 .with_context(|| {
643 format!(
644 "Failed to deserialize with both v2 and v1 from file '{}'. error: \
645 {err_v2}",
646 file_path.display()
647 )
648 })
649 })?;
650
651 let epoch = state_cert.epoch.u64();
652 let finalized_dir_path = self.finalized_state_cert_dir_path();
653 fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
654
655 let finalized_file_path = finalized_dir_path
656 .join(epoch.to_string())
657 .with_extension("txt");
658
659 self.replace(
660 &finalized_file_path,
661 |_| Ok(true),
662 |mut file| {
663 file.write_all(&bytes)?;
664 Ok(())
665 },
666 )
667 .context(format!(
668 "finalizing light client state update certificate file for epoch {epoch:?}"
669 ))?;
670
671 Ok(Some(state_cert))
672 }
673}
674
675#[async_trait]
676impl SequencerPersistence for Persistence {
677 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
678 Ok(())
679 }
680
681 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
682 let inner = self.inner.read().await;
683 let path = inner.config_path();
684 if !path.is_file() {
685 tracing::info!("config not found at {}", path.display());
686 return Ok(None);
687 }
688 tracing::info!("loading config from {}", path.display());
689
690 let bytes =
691 fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
692 let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
693 let json = migrate_network_config(json).context("migration of network config failed")?;
694 let config = serde_json::from_value(json).context("malformed config file")?;
695 Ok(Some(config))
696 }
697
698 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
699 let inner = self.inner.write().await;
700 let path = inner.config_path();
701 tracing::info!("saving config to {}", path.display());
702 Ok(cfg.to_file(path.display().to_string())?)
703 }
704
705 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
706 let inner = self.inner.read().await;
707 let path = inner.voted_view_path();
708 if !path.is_file() {
709 return Ok(None);
710 }
711 let bytes = fs::read(inner.voted_view_path())?
712 .try_into()
713 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
714 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
715 }
716
717 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
718 let inner = self.inner.read().await;
719 let path = inner.restart_view_path();
720 if !path.is_file() {
721 return Ok(None);
722 }
723 let bytes = fs::read(path)?
724 .try_into()
725 .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
726 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
727 }
728
729 async fn append_decided_leaves(
730 &self,
731 view: ViewNumber,
732 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
733 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
734 consumer: &impl EventConsumer,
735 ) -> anyhow::Result<()> {
736 let mut inner = self.inner.write().await;
737 let path = inner.decided_leaf2_path();
738
739 fs::create_dir_all(&path).context("creating anchor leaf directory")?;
741
742 let legacy_path = inner.legacy_anchor_leaf_path();
745 if !path.is_dir() && legacy_path.is_file() {
746 tracing::info!("migrating to multi-leaf storage");
747
748 let (leaf, qc) = inner
750 .load_anchor_leaf()?
751 .context("anchor leaf file exists but unable to load contents")?;
752 let view = leaf.view_number().u64();
753 let bytes = bincode::serialize(&(leaf, qc))?;
754 let new_file = path.join(view.to_string()).with_extension("txt");
755 inner
756 .replace(
757 &new_file,
758 |_| Ok(true),
759 |mut file| {
760 file.write_all(&bytes)?;
761 Ok(())
762 },
763 )
764 .context(format!("writing anchor leaf file {view}"))?;
765
766 fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
768 }
769
770 for (info, cert) in leaf_chain {
771 let view = info.leaf.view_number().u64();
772 let file_path = path.join(view.to_string()).with_extension("txt");
773 inner.replace(
774 &file_path,
775 |_| {
776 tracing::warn!(view, "duplicate decided leaf");
779 Ok(false)
780 },
781 |mut file| {
782 let bytes = bincode::serialize(&(&info.leaf, cert))?;
783 file.write_all(&bytes)?;
784 Ok(())
785 },
786 )?;
787 }
788
789 match inner
790 .generate_decide_events(view, deciding_qc, consumer)
791 .await
792 {
793 Err(err) => {
794 tracing::warn!(?view, "event processing failed: {err:#}");
798 },
799 Ok(intervals) => {
800 if let Err(err) = inner.collect_garbage(view, &intervals) {
801 tracing::warn!(?view, "GC failed: {err:#}");
805 }
806 },
807 }
808
809 Ok(())
810 }
811
812 async fn load_anchor_leaf(
813 &self,
814 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
815 self.inner.read().await.load_anchor_leaf()
816 }
817
818 async fn load_da_proposal(
819 &self,
820 view: ViewNumber,
821 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
822 self.inner.read().await.load_da_proposal(view)
823 }
824
825 async fn load_vid_share(
826 &self,
827 view: ViewNumber,
828 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
829 self.inner.read().await.load_vid_share(view)
830 }
831
832 async fn append_vid(
833 &self,
834 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
835 ) -> anyhow::Result<()> {
836 let mut inner = self.inner.write().await;
837 let view_number = proposal.data.view_number().u64();
838 let dir_path = inner.vid2_dir_path();
839
840 fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
841
842 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
843 inner.replace(
844 &file_path,
845 |_| {
846 tracing::warn!(view_number, "duplicate VID share");
849 Ok(false)
850 },
851 |mut file| {
852 let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
853 let now = Instant::now();
854 file.write_all(&proposal_bytes)?;
855 self.metrics
856 .internal_append_vid_duration
857 .add_point(now.elapsed().as_secs_f64());
858 Ok(())
859 },
860 )
861 }
862
863 async fn append_da(
864 &self,
865 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
866 _vid_commit: VidCommitment,
867 ) -> anyhow::Result<()> {
868 let mut inner = self.inner.write().await;
869 let view_number = proposal.data.view_number().u64();
870 let dir_path = inner.da_dir_path();
871
872 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
873
874 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
875 inner.replace(
876 &file_path,
877 |_| {
878 tracing::warn!(view_number, "duplicate DA proposal");
881 Ok(false)
882 },
883 |mut file| {
884 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
885 let now = Instant::now();
886 file.write_all(&proposal_bytes)?;
887 self.metrics
888 .internal_append_da_duration
889 .add_point(now.elapsed().as_secs_f64());
890 Ok(())
891 },
892 )
893 }
894 async fn record_action(
895 &self,
896 view: ViewNumber,
897 _epoch: Option<EpochNumber>,
898 action: HotShotAction,
899 ) -> anyhow::Result<()> {
900 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
902 return Ok(());
903 }
904 let mut inner = self.inner.write().await;
905 let path = &inner.voted_view_path();
906 inner.replace(
907 path,
908 |mut file| {
909 let mut bytes = vec![];
910 file.read_to_end(&mut bytes)?;
911 let bytes = bytes
912 .try_into()
913 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
914 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
915
916 Ok(saved_view < view)
918 },
919 |mut file| {
920 file.write_all(&view.u64().to_le_bytes())?;
921 Ok(())
922 },
923 )?;
924
925 if matches!(action, HotShotAction::Vote) {
926 let restart_view_path = &inner.restart_view_path();
927 let restart_view = view + 1;
928 inner.replace(
929 restart_view_path,
930 |mut file| {
931 let mut bytes = vec![];
932 file.read_to_end(&mut bytes)?;
933 let bytes = bytes
934 .try_into()
935 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
936 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
937
938 Ok(saved_view < restart_view)
940 },
941 |mut file| {
942 file.write_all(&restart_view.u64().to_le_bytes())?;
943 Ok(())
944 },
945 )?;
946 }
947 Ok(())
948 }
949
950 async fn append_quorum_proposal2(
951 &self,
952 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
953 ) -> anyhow::Result<()> {
954 let mut inner = self.inner.write().await;
955 let view_number = proposal.data.view_number().u64();
956 let dir_path = inner.quorum_proposals2_dir_path();
957
958 fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
959
960 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
961 inner.replace(
962 &file_path,
963 |_| {
964 Ok(true)
966 },
967 |mut file| {
968 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
969 let now = Instant::now();
970 file.write_all(&proposal_bytes)?;
971 self.metrics
972 .internal_append_quorum2_duration
973 .add_point(now.elapsed().as_secs_f64());
974 Ok(())
975 },
976 )
977 }
978 async fn load_quorum_proposals(
979 &self,
980 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
981 {
982 let inner = self.inner.read().await;
983
984 let dir_path = inner.quorum_proposals2_dir_path();
986 if !dir_path.is_dir() {
987 return Ok(Default::default());
988 }
989
990 let mut map = BTreeMap::new();
992 for (view, path) in view_files(&dir_path)? {
993 let proposal_bytes = fs::read(path)?;
994 let Some(proposal) = bincode::deserialize::<
995 Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
996 >(&proposal_bytes)
997 .or_else(|error| {
998 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
999 &proposal_bytes,
1000 )
1001 .map(convert_proposal)
1002 .inspect_err(|err_v3| {
1003 tracing::warn!(
1011 ?view,
1012 %error,
1013 error_v3 = %err_v3,
1014 "ignoring malformed quorum proposal file"
1015 );
1016 })
1017 })
1018 .ok() else {
1019 continue;
1020 };
1021
1022 let proposal2 = convert_proposal(proposal);
1023
1024 map.insert(view, proposal2);
1026 }
1027
1028 Ok(map)
1029 }
1030
1031 async fn load_quorum_proposal(
1032 &self,
1033 view: ViewNumber,
1034 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1035 let inner = self.inner.read().await;
1036 let dir_path = inner.quorum_proposals2_dir_path();
1037 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1038 let bytes = fs::read(file_path)?;
1039 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1040 bincode::deserialize(&bytes).or_else(|error| {
1041 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1042 &bytes,
1043 )
1044 .map(convert_proposal)
1045 .context(format!(
1046 "Failed to deserialize quorum proposal for view {view:?}: {error}."
1047 ))
1048 })?;
1049 Ok(proposal)
1050 }
1051
1052 async fn load_upgrade_certificate(
1053 &self,
1054 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1055 let inner = self.inner.read().await;
1056 let path = inner.upgrade_certificate_dir_path();
1057 if !path.is_file() {
1058 return Ok(None);
1059 }
1060 let bytes = fs::read(&path).context("read")?;
1061 Ok(Some(
1062 bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1063 ))
1064 }
1065
1066 async fn store_upgrade_certificate(
1067 &self,
1068 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1069 ) -> anyhow::Result<()> {
1070 let mut inner = self.inner.write().await;
1071 let path = &inner.upgrade_certificate_dir_path();
1072 let certificate = match decided_upgrade_certificate {
1073 Some(cert) => cert,
1074 None => return Ok(()),
1075 };
1076 inner.replace(
1077 path,
1078 |_| {
1079 Ok(true)
1081 },
1082 |mut file| {
1083 let bytes =
1084 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1085 file.write_all(&bytes)?;
1086 Ok(())
1087 },
1088 )
1089 }
1090
1091 async fn store_next_epoch_quorum_certificate(
1092 &self,
1093 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1094 ) -> anyhow::Result<()> {
1095 let mut inner = self.inner.write().await;
1096 let path = &inner.next_epoch_qc();
1097
1098 inner.replace(
1099 path,
1100 |_| {
1101 Ok(true)
1103 },
1104 |mut file| {
1105 let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1106 file.write_all(&bytes)?;
1107 Ok(())
1108 },
1109 )
1110 }
1111
1112 async fn load_next_epoch_quorum_certificate(
1113 &self,
1114 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1115 let inner = self.inner.read().await;
1116 let path = inner.next_epoch_qc();
1117 if !path.is_file() {
1118 return Ok(None);
1119 }
1120 let bytes = fs::read(&path).context("read")?;
1121 Ok(Some(
1122 bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1123 ))
1124 }
1125
1126 async fn store_eqc(
1127 &self,
1128 high_qc: QuorumCertificate2<SeqTypes>,
1129 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1130 ) -> anyhow::Result<()> {
1131 let mut inner = self.inner.write().await;
1132 let path = &inner.eqc();
1133
1134 inner.replace(
1135 path,
1136 |_| {
1137 Ok(true)
1139 },
1140 |mut file| {
1141 let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1142 .context("serializing next epoch qc")?;
1143 file.write_all(&bytes)?;
1144 Ok(())
1145 },
1146 )
1147 }
1148
1149 async fn load_eqc(
1150 &self,
1151 ) -> Option<(
1152 QuorumCertificate2<SeqTypes>,
1153 NextEpochQuorumCertificate2<SeqTypes>,
1154 )> {
1155 let inner = self.inner.read().await;
1156 let path = inner.eqc();
1157 if !path.is_file() {
1158 return None;
1159 }
1160 let bytes = fs::read(&path).ok()?;
1161
1162 bincode::deserialize(&bytes).ok()
1163 }
1164
1165 async fn append_da2(
1166 &self,
1167 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1168 _vid_commit: VidCommitment,
1169 ) -> anyhow::Result<()> {
1170 let mut inner = self.inner.write().await;
1171 let view_number = proposal.data.view_number().u64();
1172 let dir_path = inner.da2_dir_path();
1173
1174 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1175
1176 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1177 inner.replace(
1178 &file_path,
1179 |_| {
1180 tracing::warn!(view_number, "duplicate DA proposal");
1183 Ok(false)
1184 },
1185 |mut file| {
1186 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1187 let now = Instant::now();
1188 file.write_all(&proposal_bytes)?;
1189 self.metrics
1190 .internal_append_da2_duration
1191 .add_point(now.elapsed().as_secs_f64());
1192 Ok(())
1193 },
1194 )
1195 }
1196
1197 async fn append_proposal2(
1198 &self,
1199 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1200 ) -> anyhow::Result<()> {
1201 self.append_quorum_proposal2(proposal).await
1202 }
1203
1204 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1205 let mut inner = self.inner.write().await;
1206
1207 if inner.migrated.contains("anchor_leaf") {
1208 tracing::info!("decided leaves already migrated");
1209 return Ok(());
1210 }
1211
1212 let new_leaf_dir = inner.decided_leaf2_path();
1213
1214 fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2 dir")?;
1215
1216 let old_leaf_dir = inner.decided_leaf_path();
1217 if !old_leaf_dir.is_dir() {
1218 return Ok(());
1219 }
1220
1221 tracing::warn!("migrating decided leaves..");
1222 for entry in fs::read_dir(old_leaf_dir)? {
1223 let entry = entry?;
1224 let path = entry.path();
1225
1226 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1227 continue;
1228 };
1229 let Ok(view) = file.parse::<u64>() else {
1230 continue;
1231 };
1232
1233 let bytes =
1234 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1235 let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1236 .context(format!("parsing decided leaf {}", path.display()))?;
1237
1238 let leaf2: Leaf2 = leaf.into();
1239 let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1240
1241 let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1242
1243 inner.replace(
1244 &new_leaf_path,
1245 |_| {
1246 tracing::warn!(view, "duplicate decided leaf");
1247 Ok(false)
1248 },
1249 |mut file| {
1250 let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1251 file.write_all(&bytes)?;
1252 Ok(())
1253 },
1254 )?;
1255
1256 if view % 100 == 0 {
1257 tracing::info!(view, "decided leaves migration progress");
1258 }
1259 }
1260
1261 inner.migrated.insert("anchor_leaf".to_string());
1262 inner.update_migration()?;
1263 tracing::warn!("successfully migrated decided leaves");
1264 Ok(())
1265 }
1266 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1267 let mut inner = self.inner.write().await;
1268
1269 if inner.migrated.contains("da_proposal") {
1270 tracing::info!("da proposals already migrated");
1271 return Ok(());
1272 }
1273
1274 let new_da_dir = inner.da2_dir_path();
1275
1276 fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1277
1278 let old_da_dir = inner.da_dir_path();
1279 if !old_da_dir.is_dir() {
1280 return Ok(());
1281 }
1282
1283 tracing::warn!("migrating da proposals..");
1284
1285 for entry in fs::read_dir(old_da_dir)? {
1286 let entry = entry?;
1287 let path = entry.path();
1288
1289 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1290 continue;
1291 };
1292 let Ok(view) = file.parse::<u64>() else {
1293 continue;
1294 };
1295
1296 let bytes =
1297 fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1298 let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1299 .context(format!("parsing da proposal {}", path.display()))?;
1300
1301 let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1302
1303 let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1304
1305 inner.replace(
1306 &new_da_path,
1307 |_| {
1308 tracing::warn!(view, "duplicate DA proposal 2");
1309 Ok(false)
1310 },
1311 |mut file| {
1312 let bytes = bincode::serialize(&proposal2)?;
1313 file.write_all(&bytes)?;
1314 Ok(())
1315 },
1316 )?;
1317
1318 if view % 100 == 0 {
1319 tracing::info!(view, "DA proposals migration progress");
1320 }
1321 }
1322
1323 inner.migrated.insert("da_proposal".to_string());
1324 inner.update_migration()?;
1325 tracing::warn!("successfully migrated da proposals");
1326 Ok(())
1327 }
1328 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1329 let mut inner = self.inner.write().await;
1330
1331 if inner.migrated.contains("vid_share") {
1332 tracing::info!("vid shares already migrated");
1333 return Ok(());
1334 }
1335
1336 let new_vid_dir = inner.vid2_dir_path();
1337
1338 fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1339
1340 let old_vid_dir = inner.vid_dir_path();
1341 if !old_vid_dir.is_dir() {
1342 return Ok(());
1343 }
1344
1345 tracing::warn!("migrating vid shares..");
1346
1347 for entry in fs::read_dir(old_vid_dir)? {
1348 let entry = entry?;
1349 let path = entry.path();
1350
1351 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1352 continue;
1353 };
1354 let Ok(view) = file.parse::<u64>() else {
1355 continue;
1356 };
1357
1358 let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1359 let proposal =
1360 bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1361 .context(format!("parsing vid share {}", path.display()))?;
1362
1363 let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1364
1365 let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1366 convert_proposal(proposal);
1367
1368 inner.replace(
1369 &new_vid_path,
1370 |_| {
1371 tracing::warn!(view, "duplicate VID share ");
1372 Ok(false)
1373 },
1374 |mut file| {
1375 let bytes = bincode::serialize(&proposal2)?;
1376 file.write_all(&bytes)?;
1377 Ok(())
1378 },
1379 )?;
1380
1381 if view % 100 == 0 {
1382 tracing::info!(view, "VID shares migration progress");
1383 }
1384 }
1385
1386 inner.migrated.insert("vid_share".to_string());
1387 inner.update_migration()?;
1388 tracing::warn!("successfully migrated vid shares");
1389 Ok(())
1390 }
1391
1392 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1393 let mut inner = self.inner.write().await;
1394
1395 if inner.migrated.contains("quorum_proposals") {
1396 tracing::info!("quorum proposals already migrated");
1397 return Ok(());
1398 }
1399
1400 let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1401
1402 fs::create_dir_all(new_quorum_proposals_dir.clone())
1403 .context("failed to create quorum proposals 2 dir")?;
1404
1405 let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1406 if !old_quorum_proposals_dir.is_dir() {
1407 tracing::info!("no existing quorum proposals found for migration");
1408 return Ok(());
1409 }
1410
1411 tracing::warn!("migrating quorum proposals..");
1412 for entry in fs::read_dir(old_quorum_proposals_dir)? {
1413 let entry = entry?;
1414 let path = entry.path();
1415
1416 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1417 continue;
1418 };
1419 let Ok(view) = file.parse::<u64>() else {
1420 continue;
1421 };
1422
1423 let bytes =
1424 fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1425 let proposal =
1426 bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1427 .context(format!("parsing quorum proposal {}", path.display()))?;
1428
1429 let new_file_path = new_quorum_proposals_dir
1430 .join(view.to_string())
1431 .with_extension("txt");
1432
1433 let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1434 convert_proposal(proposal);
1435
1436 inner.replace(
1437 &new_file_path,
1438 |_| {
1439 tracing::warn!(view, "duplicate Quorum proposal2 ");
1440 Ok(false)
1441 },
1442 |mut file| {
1443 let bytes = bincode::serialize(&proposal2)?;
1444 file.write_all(&bytes)?;
1445 Ok(())
1446 },
1447 )?;
1448
1449 if view % 100 == 0 {
1450 tracing::info!(view, "Quorum proposals migration progress");
1451 }
1452 }
1453
1454 inner.migrated.insert("quorum_proposals".to_string());
1455 inner.update_migration()?;
1456 tracing::warn!("successfully migrated quorum proposals");
1457 Ok(())
1458 }
1459 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1460 Ok(())
1461 }
1462
1463 async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
1464 let mut inner = self.inner.write().await;
1465
1466 if inner.migrated.contains("x25519_keys") {
1467 tracing::info!("x25519_keys migration already complete");
1468 return Ok(());
1469 }
1470
1471 let path = inner.stake_table_dir_path();
1472 if !path.is_dir() {
1473 inner.migrated.insert("x25519_keys".to_string());
1474 inner.update_migration()?;
1475 return Ok(());
1476 }
1477
1478 tracing::warn!("migrating stake tables to add x25519 key fields...");
1479
1480 for (epoch, file_path) in epoch_files(&path)? {
1481 let bytes = fs::read(&file_path).with_context(|| {
1482 format!("failed to read stake table file at {}", file_path.display())
1483 })?;
1484
1485 let (stake, needs_rewrite) = deserialize_stake_table(&bytes).with_context(|| {
1486 format!(
1487 "failed to deserialize stake table at {}",
1488 file_path.display()
1489 )
1490 })?;
1491
1492 if !needs_rewrite {
1493 continue;
1494 }
1495
1496 let new_bytes = bincode::serialize(&stake)?;
1498 let tmp_path = file_path.with_extension("txt.tmp");
1499 fs::write(&tmp_path, new_bytes)?;
1500 fs::rename(&tmp_path, &file_path)?;
1501
1502 tracing::info!(?epoch, "migrated stake table");
1503 }
1504
1505 let validators_dir = path.join("validators");
1507 if validators_dir.is_dir() {
1508 type LegacyJsonMap = indexmap::IndexMap<Address, RegisteredValidatorNoX25519>;
1509
1510 for entry in fs::read_dir(&validators_dir)? {
1511 let entry = entry?;
1512 let file_path = entry.path();
1513 if file_path.extension().is_some_and(|ext| ext == "json") {
1514 let content = fs::read_to_string(&file_path)?;
1515
1516 if serde_json::from_str::<RegisteredValidatorMap>(&content).is_ok() {
1518 continue;
1519 }
1520
1521 let legacy: LegacyJsonMap =
1523 serde_json::from_str(&content).with_context(|| {
1524 format!(
1525 "failed to deserialize validators at {} (tried both formats)",
1526 file_path.display()
1527 )
1528 })?;
1529
1530 let migrated: RegisteredValidatorMap = legacy
1531 .into_iter()
1532 .map(|(addr, v)| (addr, v.migrate()))
1533 .collect();
1534
1535 let new_json = serde_json::to_string_pretty(&migrated)?;
1537 let tmp_path = file_path.with_extension("json.tmp");
1538 fs::write(&tmp_path, new_json)?;
1539 fs::rename(&tmp_path, &file_path)?;
1540
1541 tracing::info!(?file_path, "migrated validators file");
1542 }
1543 }
1544 }
1545
1546 inner.migrated.insert("x25519_keys".to_string());
1547 inner.update_migration()?;
1548 tracing::warn!("x25519_keys migration complete");
1549 Ok(())
1550 }
1551
1552 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1553 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1554 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1555 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1556 } else if loaded_drb_input.iteration >= drb_input.iteration {
1557 anyhow::bail!(
1558 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1559 loaded_drb_input,
1560 drb_input
1561 )
1562 }
1563 }
1564
1565 let mut inner = self.inner.write().await;
1566 let dir_path = inner.drb_dir_path();
1567
1568 fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1569
1570 let drb_input_bytes =
1571 bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1572
1573 let file_path = dir_path
1574 .join(drb_input.epoch.to_string())
1575 .with_extension("bin");
1576
1577 inner.replace(
1578 &file_path,
1579 |_| {
1580 Ok(true)
1582 },
1583 |mut file| {
1584 file.write_all(&drb_input_bytes).context(format!(
1585 "writing epoch drb_input file for epoch {:?} at {:?}",
1586 drb_input.epoch, file_path
1587 ))
1588 },
1589 )
1590 }
1591
1592 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1593 let inner = self.inner.read().await;
1594 let path = &inner.drb_dir_path();
1595 let file_path = path.join(epoch.to_string()).with_extension("bin");
1596 let bytes = fs::read(&file_path).context("read")?;
1597 Ok(bincode::deserialize(&bytes)
1598 .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1599 }
1600
1601 async fn store_drb_result(
1602 &self,
1603 epoch: EpochNumber,
1604 drb_result: DrbResult,
1605 ) -> anyhow::Result<()> {
1606 let mut inner = self.inner.write().await;
1607 let dir_path = inner.epoch_drb_result_dir_path();
1608
1609 fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1610
1611 let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1612
1613 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1614
1615 inner.replace(
1616 &file_path,
1617 |_| {
1618 Ok(true)
1620 },
1621 |mut file| {
1622 file.write_all(&drb_result_bytes)
1623 .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1624 },
1625 )
1626 }
1627
1628 async fn store_epoch_root(
1629 &self,
1630 epoch: EpochNumber,
1631 block_header: <SeqTypes as NodeType>::BlockHeader,
1632 ) -> anyhow::Result<()> {
1633 let mut inner = self.inner.write().await;
1634 let dir_path = inner.epoch_root_block_header_dir_path();
1635
1636 fs::create_dir_all(dir_path.clone())
1637 .context("failed to create epoch root block header dir")?;
1638
1639 let block_header_bytes =
1640 bincode::serialize(&block_header).context("serialize block header")?;
1641
1642 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1643 inner
1644 .replace(
1645 &file_path,
1646 |_| Ok(true),
1647 |mut file| {
1648 file.write_all(&block_header_bytes)?;
1649 Ok(())
1650 },
1651 )
1652 .context(format!(
1653 "writing epoch root block header file for epoch {epoch:?}"
1654 ))?;
1655
1656 Ok(())
1657 }
1658
1659 async fn add_state_cert(
1660 &self,
1661 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1662 ) -> anyhow::Result<()> {
1663 let mut inner = self.inner.write().await;
1664 let view = state_cert.light_client_state.view_number;
1666 let dir_path = inner.state_cert_dir_path();
1667
1668 fs::create_dir_all(dir_path.clone())
1669 .context("failed to create light client state update certificate dir")?;
1670
1671 let bytes = bincode::serialize(&state_cert)
1672 .context("serialize light client state update certificate")?;
1673
1674 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1675 inner
1676 .replace(
1677 &file_path,
1678 |_| Ok(true),
1679 |mut file| {
1680 file.write_all(&bytes)?;
1681 Ok(())
1682 },
1683 )
1684 .context(format!(
1685 "writing light client state update certificate file for view {view:?}"
1686 ))?;
1687
1688 Ok(())
1689 }
1690
1691 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1692 let inner = self.inner.read().await;
1693 let drb_dir_path = inner.epoch_drb_result_dir_path();
1694 let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1695
1696 let mut result = Vec::new();
1697
1698 if !drb_dir_path.is_dir() {
1699 return Ok(Vec::new());
1700 }
1701 for (epoch, path) in epoch_files(drb_dir_path)? {
1702 let bytes =
1703 fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1704 let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1705 .context(format!("parsing epoch drb result {}", path.display()))?;
1706
1707 let block_header_path = block_header_dir_path
1708 .join(epoch.to_string())
1709 .with_extension("txt");
1710 let block_header = if block_header_path.is_file() {
1711 let bytes = fs::read(&block_header_path).context(format!(
1712 "reading epoch root block header {}",
1713 block_header_path.display()
1714 ))?;
1715 Some(
1716 bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1717 format!(
1718 "parsing epoch root block header {}",
1719 block_header_path.display()
1720 ),
1721 )?,
1722 )
1723 } else {
1724 None
1725 };
1726
1727 result.push(InitializerEpochInfo::<SeqTypes> {
1728 epoch,
1729 drb_result,
1730 block_header,
1731 });
1732 }
1733
1734 result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1735
1736 let start = result
1738 .len()
1739 .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1740 let recent = result[start..].to_vec();
1741
1742 Ok(recent)
1743 }
1744
1745 async fn load_state_cert(
1746 &self,
1747 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1748 let inner = self.inner.read().await;
1749 let dir_path = inner.finalized_state_cert_dir_path();
1750
1751 if !dir_path.is_dir() {
1752 return Ok(None);
1753 }
1754
1755 let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1756
1757 for (epoch, path) in epoch_files(dir_path)? {
1758 if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1759 continue;
1760 }
1761 let bytes = fs::read(&path).context(format!(
1762 "reading light client state update certificate {}",
1763 path.display()
1764 ))?;
1765 let cert =
1766 bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1767 .or_else(|error| {
1768 tracing::info!(
1769 %error,
1770 path = %path.display(),
1771 "Failed to deserialize LightClientStateUpdateCertificateV2"
1772 );
1773
1774 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1775 &bytes,
1776 )
1777 .map(Into::into)
1778 .with_context(|| {
1779 format!(
1780 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1781 path.display()
1782 )
1783 })
1784 })?;
1785
1786 result = Some(cert);
1787 }
1788
1789 Ok(result)
1790 }
1791
1792 async fn get_state_cert_by_epoch(
1793 &self,
1794 epoch: u64,
1795 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1796 let inner = self.inner.read().await;
1797 let dir_path = inner.finalized_state_cert_dir_path();
1798
1799 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1800
1801 if !file_path.exists() {
1802 return Ok(None);
1803 }
1804
1805 let bytes = fs::read(&file_path).context(format!(
1806 "reading light client state update certificate {}",
1807 file_path.display()
1808 ))?;
1809
1810 let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1811 .or_else(|error| {
1812 tracing::info!(
1813 %error,
1814 path = %file_path.display(),
1815 "Failed to deserialize LightClientStateUpdateCertificateV2"
1816 );
1817
1818 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1819 .map(Into::into)
1820 .with_context(|| {
1821 format!(
1822 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1823 file_path.display()
1824 )
1825 })
1826 })?;
1827
1828 Ok(Some(cert))
1829 }
1830
1831 async fn insert_state_cert(
1832 &self,
1833 epoch: u64,
1834 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1835 ) -> anyhow::Result<()> {
1836 let inner = self.inner.read().await;
1837 let dir_path = inner.finalized_state_cert_dir_path();
1838
1839 fs::create_dir_all(&dir_path)
1840 .context(format!("creating state cert dir {}", dir_path.display()))?;
1841
1842 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1843 let bytes = bincode::serialize(&cert)
1844 .context("serializing light client state update certificate")?;
1845
1846 fs::write(&file_path, bytes).context(format!(
1847 "writing light client state update certificate {}",
1848 file_path.display()
1849 ))?;
1850
1851 Ok(())
1852 }
1853
1854 fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1855 }
1857}
1858
1859#[async_trait]
1860impl MembershipPersistence for Persistence {
1861 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
1862 let inner = self.inner.read().await;
1863 let path = &inner.stake_table_dir_path();
1864 let file_path = path.join(epoch.to_string()).with_extension("txt");
1865
1866 if !file_path.exists() {
1867 return Ok(None);
1868 }
1869
1870 let bytes = fs::read(&file_path).with_context(|| {
1871 format!("failed to read stake table file at {}", file_path.display())
1872 })?;
1873
1874 let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1875 format!(
1876 "failed to deserialize stake table at {}",
1877 file_path.display()
1878 )
1879 })?;
1880 Ok(Some(stake))
1881 }
1882
1883 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1884 let limit = limit as usize;
1885 let inner = self.inner.read().await;
1886 let path = &inner.stake_table_dir_path();
1887 let sorted_files = epoch_files(path)?
1888 .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1889 .take(limit);
1890 let mut validator_sets: Vec<IndexedStake> = Vec::new();
1891
1892 for (epoch, file_path) in sorted_files {
1893 let bytes = fs::read(&file_path).with_context(|| {
1894 format!("failed to read stake table file at {}", file_path.display())
1895 })?;
1896
1897 let stake: StakeTuple = bincode::deserialize(&bytes).with_context(|| {
1898 format!(
1899 "failed to deserialize stake table at {}",
1900 file_path.display()
1901 )
1902 })?;
1903 validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1904 }
1905
1906 Ok(Some(validator_sets))
1907 }
1908
1909 async fn store_stake(
1910 &self,
1911 epoch: EpochNumber,
1912 stake: AuthenticatedValidatorMap,
1913 block_reward: Option<RewardAmount>,
1914 stake_table_hash: Option<StakeTableHash>,
1915 ) -> anyhow::Result<()> {
1916 let mut inner = self.inner.write().await;
1917 let dir_path = &inner.stake_table_dir_path();
1918
1919 fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1920
1921 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1922
1923 inner.replace(
1924 &file_path,
1925 |_| {
1926 Ok(true)
1928 },
1929 |mut file| {
1930 let data: StakeTuple = (stake, block_reward, stake_table_hash);
1931 let bytes =
1932 bincode::serialize(&data).context("serializing combined stake table")?;
1933 file.write_all(&bytes)?;
1934 Ok(())
1935 },
1936 )
1937 }
1938
1939 async fn store_events(
1941 &self,
1942 to_l1_block: u64,
1943 events: Vec<(EventKey, StakeTableEvent)>,
1944 ) -> anyhow::Result<()> {
1945 let mut inner = self.inner.write().await;
1946 let dir_path = &inner.stake_table_dir_path();
1947 let events_dir = dir_path.join("events");
1948
1949 fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1950 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1952
1953 if last_l1_finalized_path.exists() {
1955 let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1956 format!("Failed to read file at path: {last_l1_finalized_path:?}")
1957 })?;
1958 let mut buf = [0; 8];
1959 bytes
1960 .as_slice()
1961 .read_exact(&mut buf[..8])
1962 .with_context(|| {
1963 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1964 })?;
1965 let persisted_l1_block = u64::from_le_bytes(buf);
1966 if persisted_l1_block > to_l1_block {
1967 tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1968 return Ok(());
1969 }
1970 }
1971
1972 for (event_key, event) in events {
1976 let (block_number, event_index) = event_key;
1977 let filename = format!("{block_number}_{event_index}");
1979 let file_path = events_dir.join(filename).with_extension("json");
1980
1981 if file_path.exists() {
1982 continue;
1983 }
1984
1985 inner
1986 .replace(
1987 &file_path,
1988 |_| Ok(true),
1989 |file| {
1990 let writer = BufWriter::new(file);
1991
1992 serde_json::to_writer_pretty(writer, &event)?;
1993 Ok(())
1994 },
1995 )
1996 .context("Failed to write event to file")?;
1997 }
1998
1999 inner.replace(
2001 &last_l1_finalized_path,
2002 |_| Ok(true),
2003 |mut file| {
2004 let bytes = to_l1_block.to_le_bytes();
2005
2006 file.write_all(&bytes)?;
2007 tracing::debug!("updated l1 finalized ={to_l1_block:?}");
2008 Ok(())
2009 },
2010 )
2011 }
2012
2013 async fn load_events(
2024 &self,
2025 from_l1_block: u64,
2026 to_l1_block: u64,
2027 ) -> anyhow::Result<(
2028 Option<EventsPersistenceRead>,
2029 Vec<(EventKey, StakeTableEvent)>,
2030 )> {
2031 let inner = self.inner.read().await;
2032 let dir_path = inner.stake_table_dir_path();
2033 let events_dir = dir_path.join("events");
2034
2035 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
2038
2039 if !last_l1_finalized_path.exists() || !events_dir.exists() {
2040 return Ok((None, Vec::new()));
2041 }
2042
2043 let mut events = Vec::new();
2044
2045 let bytes = fs::read(&last_l1_finalized_path)
2046 .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
2047 let mut buf = [0; 8];
2048 bytes
2049 .as_slice()
2050 .read_exact(&mut buf[..8])
2051 .with_context(|| {
2052 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
2053 })?;
2054
2055 let last_processed_l1_block = u64::from_le_bytes(buf);
2056
2057 let query_l1_block = if last_processed_l1_block > to_l1_block {
2061 to_l1_block
2062 } else {
2063 last_processed_l1_block
2064 };
2065
2066 for entry in fs::read_dir(&events_dir).context("events directory")? {
2067 let entry = entry?;
2068 let path = entry.path();
2069
2070 if !entry.file_type()?.is_file() {
2071 continue;
2072 }
2073
2074 if path
2075 .extension()
2076 .context(format!("extension for path={path:?}"))?
2077 != "json"
2078 {
2079 continue;
2080 }
2081
2082 let filename = path
2083 .file_stem()
2084 .and_then(|f| f.to_str())
2085 .unwrap_or_default();
2086
2087 let parts: Vec<&str> = filename.split('_').collect();
2088 if parts.len() != 2 {
2089 continue;
2090 }
2091
2092 let block_number = parts[0].parse::<u64>()?;
2093 let log_index = parts[1].parse::<u64>()?;
2094
2095 if block_number < from_l1_block || block_number > query_l1_block {
2096 continue;
2097 }
2098
2099 let file =
2100 File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2101 let reader = BufReader::new(file);
2102
2103 let event: StakeTableEvent = serde_json::from_reader(reader)
2104 .context(format!("Failed to deserialize event at path={path:?}"))?;
2105
2106 events.push(((block_number, log_index), event));
2107 }
2108
2109 events.sort_by_key(|(key, _)| *key);
2110
2111 if query_l1_block == to_l1_block {
2112 Ok((Some(EventsPersistenceRead::Complete), events))
2113 } else {
2114 Ok((
2115 Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2116 events,
2117 ))
2118 }
2119 }
2120
2121 async fn delete_stake_tables(&self) -> anyhow::Result<()> {
2122 let inner = self.inner.write().await;
2123 let events_dir = inner.stake_table_dir_path().join("events");
2124 if events_dir.exists() {
2125 fs::remove_dir_all(&events_dir)
2126 .with_context(|| format!("Failed to remove events dir: {events_dir:?}"))?;
2127 }
2128 let validators_dir = inner.stake_table_dir_path().join("validators");
2129 if validators_dir.exists() {
2130 fs::remove_dir_all(&validators_dir)
2131 .with_context(|| format!("Failed to remove validators dir: {validators_dir:?}"))?;
2132 }
2133 let drb_dir = inner.epoch_drb_result_dir_path();
2134 if drb_dir.exists() {
2135 fs::remove_dir_all(&drb_dir)
2136 .with_context(|| format!("Failed to remove epoch DRB result dir: {drb_dir:?}"))?;
2137 }
2138 Ok(())
2139 }
2140
2141 async fn store_all_validators(
2142 &self,
2143 epoch: EpochNumber,
2144 all_validators: RegisteredValidatorMap,
2145 ) -> anyhow::Result<()> {
2146 let mut inner = self.inner.write().await;
2147 let dir_path = inner.stake_table_dir_path();
2148 let validators_dir = dir_path.join("validators");
2149
2150 fs::create_dir_all(&validators_dir)
2152 .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2153
2154 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2156
2157 inner
2158 .replace(
2159 &file_path,
2160 |_| Ok(true),
2161 |file| {
2162 let writer = BufWriter::new(file);
2163
2164 serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2165 format!("Failed to serialize validators for epoch {epoch}")
2166 })?;
2167 Ok(())
2168 },
2169 )
2170 .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2171
2172 Ok(())
2173 }
2174
2175 async fn load_all_validators(
2176 &self,
2177 epoch: EpochNumber,
2178 offset: u64,
2179 limit: u64,
2180 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
2181 let inner = self.inner.read().await;
2182 let dir_path = inner.stake_table_dir_path();
2183 let validators_dir = dir_path.join("validators");
2184 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2185
2186 if !file_path.exists() {
2187 bail!("Validator file not found for epoch {epoch}");
2188 }
2189
2190 let file = File::open(&file_path)
2191 .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2192 let reader = BufReader::new(file);
2193
2194 let map: RegisteredValidatorMap = serde_json::from_reader(reader).with_context(|| {
2195 format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2196 })?;
2197
2198 let mut values: Vec<RegisteredValidator<PubKey>> = map.into_values().collect();
2199 values.sort_by_key(|v| v.account);
2200
2201 let start = offset as usize;
2202 let end = (start + limit as usize).min(values.len());
2203
2204 if start >= values.len() {
2205 return Ok(vec![]);
2206 }
2207
2208 Ok(values[start..end].to_vec())
2209 }
2210}
2211
2212#[async_trait]
2213impl DhtPersistentStorage for Persistence {
2214 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2220 let to_save =
2222 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2223
2224 let path = self.inner.read().await.libp2p_dht_path();
2226
2227 fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2229 .with_context(|| "failed to create directory")?;
2230
2231 let mut inner = self.inner.write().await;
2233
2234 inner
2236 .replace(
2237 &path,
2238 |_| {
2239 Ok(true)
2241 },
2242 |mut file| {
2243 file.write_all(&to_save)
2244 .with_context(|| "failed to write records to file")?;
2245 Ok(())
2246 },
2247 )
2248 .with_context(|| "failed to save records to file")?;
2249
2250 Ok(())
2251 }
2252
2253 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2259 let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2261 .with_context(|| "Failed to read records from file")?;
2262
2263 let records: Vec<SerializableRecord> =
2265 bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2266
2267 Ok(records)
2268 }
2269}
2270
2271fn view_files(
2273 dir: impl AsRef<Path>,
2274) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2275 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2276 let dir = dir.as_ref().display();
2277 let entry = entry.ok()?;
2278 if !entry.file_type().ok()?.is_file() {
2279 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2280 return None;
2281 }
2282 let path = entry.path();
2283 if path.extension()? != "txt" {
2284 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2285 return None;
2286 }
2287 let file_name = path.file_stem()?;
2288 let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2289 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2290 return None;
2291 };
2292 Some((ViewNumber::new(view_number), entry.path().to_owned()))
2293 }))
2294}
2295
2296fn epoch_files(
2299 dir: impl AsRef<Path>,
2300) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2301 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2302 let dir = dir.as_ref().display();
2303 let entry = entry.ok()?;
2304 if !entry.file_type().ok()?.is_file() {
2305 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2306 return None;
2307 }
2308 let path = entry.path();
2309 if path.extension()? != "txt" {
2310 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2311 return None;
2312 }
2313 let file_name = path.file_stem()?;
2314 let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2315 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2316 return None;
2317 };
2318 Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2319 }))
2320}
2321
2322#[cfg(test)]
2323mod test {
2324 use std::marker::PhantomData;
2325
2326 use committable::{Commitment, CommitmentBoundsArkless, Committable};
2327 use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2328 use hotshot::types::SignatureKey;
2329 use hotshot_example_types::node_types::TEST_VERSIONS;
2330 use hotshot_query_service::testing::mocks::MOCK_UPGRADE;
2331 use hotshot_types::{
2332 data::QuorumProposal2,
2333 light_client::LightClientState,
2334 simple_certificate::QuorumCertificate,
2335 simple_vote::QuorumData,
2336 traits::{EncodeBytes, block_contents::GENESIS_VID_NUM_STORAGE_NODES},
2337 vid::advz::advz_scheme,
2338 };
2339 use jf_advz::VidScheme;
2340 use serde_json::json;
2341 use tempfile::TempDir;
2342
2343 use super::*;
2344 use crate::{BLSPubKey, persistence::tests::TestablePersistence};
2345
2346 #[async_trait]
2347 impl TestablePersistence for Persistence {
2348 type Storage = TempDir;
2349
2350 async fn tmp_storage() -> Self::Storage {
2351 TempDir::new().unwrap()
2352 }
2353
2354 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2355 Options::new(storage.path().into())
2356 }
2357 }
2358
2359 #[test]
2360 fn test_config_migrations_add_builder_urls() {
2361 let before = json!({
2362 "config": {
2363 "builder_url": "https://test:8080",
2364 "start_proposing_view": 1,
2365 "stop_proposing_view": 2,
2366 "start_voting_view": 1,
2367 "stop_voting_view": 2,
2368 "start_proposing_time": 1,
2369 "stop_proposing_time": 2,
2370 "start_voting_time": 1,
2371 "stop_voting_time": 2
2372 }
2373 });
2374 let after = json!({
2375 "config": {
2376 "builder_urls": ["https://test:8080"],
2377 "start_proposing_view": 1,
2378 "stop_proposing_view": 2,
2379 "start_voting_view": 1,
2380 "stop_voting_view": 2,
2381 "start_proposing_time": 1,
2382 "stop_proposing_time": 2,
2383 "start_voting_time": 1,
2384 "stop_voting_time": 2,
2385 "epoch_height": 0,
2386 "drb_difficulty": 0,
2387 "drb_upgrade_difficulty": 0,
2388 "da_committees": [],
2389 }
2390 });
2391
2392 assert_eq!(migrate_network_config(before).unwrap(), after);
2393 }
2394
2395 #[test]
2396 fn test_config_migrations_existing_builder_urls() {
2397 let before = json!({
2398 "config": {
2399 "builder_urls": ["https://test:8080", "https://test:8081"],
2400 "start_proposing_view": 1,
2401 "stop_proposing_view": 2,
2402 "start_voting_view": 1,
2403 "stop_voting_view": 2,
2404 "start_proposing_time": 1,
2405 "stop_proposing_time": 2,
2406 "start_voting_time": 1,
2407 "stop_voting_time": 2,
2408 "epoch_height": 0,
2409 "drb_difficulty": 0,
2410 "drb_upgrade_difficulty": 0,
2411 "da_committees": [],
2412 }
2413 });
2414
2415 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2416 }
2417
2418 #[test]
2419 fn test_config_migrations_add_upgrade_params() {
2420 let before = json!({
2421 "config": {
2422 "builder_urls": ["https://test:8080", "https://test:8081"]
2423 }
2424 });
2425 let after = json!({
2426 "config": {
2427 "builder_urls": ["https://test:8080", "https://test:8081"],
2428 "start_proposing_view": 9007199254740991u64,
2429 "stop_proposing_view": 0,
2430 "start_voting_view": 9007199254740991u64,
2431 "stop_voting_view": 0,
2432 "start_proposing_time": 9007199254740991u64,
2433 "stop_proposing_time": 0,
2434 "start_voting_time": 9007199254740991u64,
2435 "stop_voting_time": 0,
2436 "epoch_height": 0,
2437 "drb_difficulty": 0,
2438 "drb_upgrade_difficulty": 0,
2439 "da_committees": [],
2440 }
2441 });
2442
2443 assert_eq!(migrate_network_config(before).unwrap(), after);
2444 }
2445
2446 #[test]
2447 fn test_config_migrations_existing_upgrade_params() {
2448 let before = json!({
2449 "config": {
2450 "builder_urls": ["https://test:8080", "https://test:8081"],
2451 "start_proposing_view": 1,
2452 "stop_proposing_view": 2,
2453 "start_voting_view": 1,
2454 "stop_voting_view": 2,
2455 "start_proposing_time": 1,
2456 "stop_proposing_time": 2,
2457 "start_voting_time": 1,
2458 "stop_voting_time": 2,
2459 "epoch_height": 0,
2460 "drb_difficulty": 0,
2461 "drb_upgrade_difficulty": 0,
2462 "da_committees": [],
2463 }
2464 });
2465
2466 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2467 }
2468
2469 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2470 pub async fn test_consensus_migration() {
2471 let rows = 300;
2472 let tmp = Persistence::tmp_storage().await;
2473 let mut opt = Persistence::options(&tmp);
2474 let storage = opt.create().await.unwrap();
2475
2476 let inner = storage.inner.read().await;
2477
2478 let decided_leaves_path = inner.decided_leaf_path();
2479 fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2480
2481 let qp_dir_path = inner.quorum_proposals_dir_path();
2482 fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2483
2484 let state_cert_dir_path = inner.state_cert_dir_path();
2485 fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2486 drop(inner);
2487
2488 assert!(storage.load_state_cert().await.unwrap().is_none());
2489
2490 for i in 0..rows {
2491 let view = ViewNumber::new(i);
2492 let validated_state = ValidatedState::default();
2493 let instance_state = NodeState::default();
2494
2495 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2496 let (payload, metadata) =
2497 Payload::from_transactions([], &validated_state, &instance_state)
2498 .await
2499 .unwrap();
2500
2501 let payload_bytes = payload.encode();
2502
2503 let block_header = Header::genesis(
2504 &instance_state,
2505 payload.clone(),
2506 &metadata,
2507 TEST_VERSIONS.test.base,
2508 );
2509
2510 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2511 epoch: EpochNumber::new(i),
2512 light_client_state: LightClientState {
2513 view_number: i,
2514 block_height: i,
2515 block_comm_root: Default::default(),
2516 },
2517 next_stake_table_state: Default::default(),
2518 signatures: vec![], auth_root: Default::default(),
2520 };
2521 assert!(storage.add_state_cert(state_cert).await.is_ok());
2522
2523 let null_quorum_data = QuorumData {
2524 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2525 };
2526
2527 let justify_qc = QuorumCertificate::new(
2528 null_quorum_data.clone(),
2529 null_quorum_data.commit(),
2530 view,
2531 None,
2532 PhantomData,
2533 );
2534
2535 let quorum_proposal = QuorumProposal {
2536 block_header,
2537 view_number: view,
2538 justify_qc: justify_qc.clone(),
2539 upgrade_certificate: None,
2540 proposal_certificate: None,
2541 };
2542
2543 let quorum_proposal_signature =
2544 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2545 .expect("Failed to sign quorum proposal");
2546
2547 let proposal = Proposal {
2548 data: quorum_proposal.clone(),
2549 signature: quorum_proposal_signature,
2550 _pd: PhantomData::<SeqTypes>,
2551 };
2552
2553 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2554 leaf.fill_block_payload(
2555 payload,
2556 GENESIS_VID_NUM_STORAGE_NODES,
2557 TEST_VERSIONS.test.base,
2558 )
2559 .unwrap();
2560
2561 let mut inner = storage.inner.write().await;
2562
2563 tracing::debug!("inserting decided leaves");
2564 let file_path = decided_leaves_path
2565 .join(view.to_string())
2566 .with_extension("txt");
2567
2568 tracing::debug!("inserting decided leaves");
2569
2570 inner
2571 .replace(
2572 &file_path,
2573 |_| Ok(true),
2574 |mut file| {
2575 let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2576 file.write_all(&bytes)?;
2577 Ok(())
2578 },
2579 )
2580 .expect("replace decided leaves");
2581
2582 let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2583
2584 tracing::debug!("inserting qc for {view}");
2585
2586 inner
2587 .replace(
2588 &file_path,
2589 |_| Ok(true),
2590 |mut file| {
2591 let proposal_bytes =
2592 bincode::serialize(&proposal).context("serialize proposal")?;
2593
2594 file.write_all(&proposal_bytes)?;
2595 Ok(())
2596 },
2597 )
2598 .unwrap();
2599
2600 drop(inner);
2601 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2602 .disperse(payload_bytes.clone())
2603 .unwrap();
2604
2605 let vid = VidDisperseShare0::<SeqTypes> {
2606 view_number: ViewNumber::new(i),
2607 payload_commitment: Default::default(),
2608 share: disperse.shares[0].clone(),
2609 common: disperse.common,
2610 recipient_key: pubkey,
2611 };
2612
2613 let (payload, metadata) =
2614 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2615 .await
2616 .unwrap();
2617
2618 let da = DaProposal::<SeqTypes> {
2619 encoded_transactions: payload.encode(),
2620 metadata,
2621 view_number: ViewNumber::new(i),
2622 };
2623
2624 let block_payload_signature =
2625 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2626
2627 let da_proposal = Proposal {
2628 data: da,
2629 signature: block_payload_signature,
2630 _pd: Default::default(),
2631 };
2632
2633 tracing::debug!("inserting vid for {view}");
2634 storage
2635 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2636 .await
2637 .unwrap();
2638
2639 tracing::debug!("inserting da for {view}");
2640 storage
2641 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2642 .await
2643 .unwrap();
2644 }
2645
2646 storage.migrate_storage().await.unwrap();
2647 let inner = storage.inner.read().await;
2648 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2649 let decided_leaves_count = decided_leaves
2650 .filter_map(Result::ok)
2651 .filter(|e| e.path().is_file())
2652 .count();
2653 assert_eq!(
2654 decided_leaves_count, rows as usize,
2655 "decided leaves count does not match",
2656 );
2657
2658 let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2659 let da_proposals_count = da_proposals
2660 .filter_map(Result::ok)
2661 .filter(|e| e.path().is_file())
2662 .count();
2663 assert_eq!(
2664 da_proposals_count, rows as usize,
2665 "da proposals does not match",
2666 );
2667
2668 let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2669 let vids_count = vids
2670 .filter_map(Result::ok)
2671 .filter(|e| e.path().is_file())
2672 .count();
2673 assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2674
2675 let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2676 let qps_count = qps
2677 .filter_map(Result::ok)
2678 .filter(|e| e.path().is_file())
2679 .count();
2680 assert_eq!(
2681 qps_count, rows as usize,
2682 "quorum proposals count does not match",
2683 );
2684
2685 let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2686 let state_cert_count = state_certs
2687 .filter_map(Result::ok)
2688 .filter(|e| e.path().is_file())
2689 .count();
2690 assert_eq!(
2691 state_cert_count, rows as usize,
2692 "light client state update certificate count does not match",
2693 );
2694
2695 let storage = opt.create().await.unwrap();
2699 storage.migrate_storage().await.unwrap();
2700
2701 let inner = storage.inner.read().await;
2702 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2703 let decided_leaves_count = decided_leaves
2704 .filter_map(Result::ok)
2705 .filter(|e| e.path().is_file())
2706 .count();
2707 assert_eq!(
2708 decided_leaves_count, rows as usize,
2709 "decided leaves count does not match",
2710 );
2711 }
2712
2713 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2714 async fn test_load_quorum_proposals_invalid_extension() {
2715 let tmp = Persistence::tmp_storage().await;
2716 let storage = Persistence::connect(&tmp).await;
2717
2718 let leaf = Leaf2::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base).await;
2720 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2721 let signature = PubKey::sign(&privkey, &[]).unwrap();
2722 let mut quorum_proposal = Proposal {
2723 data: QuorumProposalWrapper::<SeqTypes> {
2724 proposal: QuorumProposal2::<SeqTypes> {
2725 epoch: None,
2726 block_header: leaf.block_header().clone(),
2727 view_number: ViewNumber::genesis(),
2728 justify_qc: QuorumCertificate2::genesis(
2729 &Default::default(),
2730 &NodeState::mock(),
2731 TEST_VERSIONS.test,
2732 )
2733 .await,
2734 upgrade_certificate: None,
2735 view_change_evidence: None,
2736 next_drb_result: None,
2737 next_epoch_justify_qc: None,
2738 state_cert: None,
2739 },
2740 },
2741 signature,
2742 _pd: Default::default(),
2743 };
2744
2745 let quorum_proposal1 = quorum_proposal.clone();
2747 storage
2748 .append_quorum_proposal2(&quorum_proposal1)
2749 .await
2750 .unwrap();
2751 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2752 let quorum_proposal2 = quorum_proposal.clone();
2753 storage
2754 .append_quorum_proposal2(&quorum_proposal2)
2755 .await
2756 .unwrap();
2757
2758 fs::rename(
2761 tmp.path().join("quorum_proposals2/1.txt"),
2762 tmp.path().join("quorum_proposals2/1.swp"),
2763 )
2764 .unwrap();
2765
2766 assert_eq!(
2768 storage.load_quorum_proposals().await.unwrap(),
2769 [(ViewNumber::genesis(), quorum_proposal1)]
2770 .into_iter()
2771 .collect::<BTreeMap<_, _>>()
2772 );
2773 }
2774
2775 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2776 async fn test_load_quorum_proposals_malformed_data() {
2777 let tmp = Persistence::tmp_storage().await;
2778 let storage = Persistence::connect(&tmp).await;
2779
2780 let leaf: Leaf2 = Leaf::genesis(&Default::default(), &NodeState::mock(), MOCK_UPGRADE.base)
2782 .await
2783 .into();
2784 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2785 let signature = PubKey::sign(&privkey, &[]).unwrap();
2786 let quorum_proposal = Proposal {
2787 data: QuorumProposalWrapper::<SeqTypes> {
2788 proposal: QuorumProposal2::<SeqTypes> {
2789 epoch: None,
2790 block_header: leaf.block_header().clone(),
2791 view_number: ViewNumber::new(1),
2792 justify_qc: QuorumCertificate2::genesis(
2793 &Default::default(),
2794 &NodeState::mock(),
2795 TEST_VERSIONS.test,
2796 )
2797 .await,
2798 upgrade_certificate: None,
2799 view_change_evidence: None,
2800 next_drb_result: None,
2801 next_epoch_justify_qc: None,
2802 state_cert: None,
2803 },
2804 },
2805 signature,
2806 _pd: Default::default(),
2807 };
2808
2809 fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2811 fs::write(
2812 tmp.path().join("quorum_proposals2/0.txt"),
2813 "invalid data".as_bytes(),
2814 )
2815 .unwrap();
2816
2817 storage
2819 .append_quorum_proposal2(&quorum_proposal)
2820 .await
2821 .unwrap();
2822
2823 assert_eq!(
2825 storage.load_quorum_proposals().await.unwrap(),
2826 [(ViewNumber::new(1), quorum_proposal)]
2827 .into_iter()
2828 .collect::<BTreeMap<_, _>>()
2829 );
2830 }
2831
2832 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2833 async fn test_store_events_empty() {
2834 let tmp = Persistence::tmp_storage().await;
2835 let mut opt = Persistence::options(&tmp);
2836 let storage = opt.create().await.unwrap();
2837
2838 assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
2839
2840 for i in 1..=2 {
2842 tracing::info!(i, "update l1 height");
2843 storage.store_events(i, vec![]).await.unwrap();
2844 assert_eq!(
2845 storage.load_events(0, 100).await.unwrap(),
2846 (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
2847 );
2848 }
2849 }
2850
2851 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2852 async fn test_migrate_x25519_keys() {
2853 use std::collections::HashMap;
2854
2855 use alloy::primitives::{Address, U256};
2856 use indexmap::IndexMap;
2857
2858 use crate::persistence::RegisteredValidatorNoX25519;
2859
2860 let tmp = Persistence::tmp_storage().await;
2861 let mut opt = Persistence::options(&tmp);
2862 let storage = opt.create().await.unwrap();
2863
2864 let addr = Address::random();
2865 let legacy_validator = RegisteredValidatorNoX25519 {
2866 account: addr,
2867 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2868 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2869 stake: U256::from(1000),
2870 commission: 100,
2871 delegators: HashMap::new(),
2872 authenticated: true,
2873 };
2874
2875 let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2877 legacy_map.insert(addr, legacy_validator);
2878
2879 type LegacyTuple = (
2880 IndexMap<Address, RegisteredValidatorNoX25519>,
2881 Option<RewardAmount>,
2882 Option<StakeTableHash>,
2883 );
2884 let legacy_data: LegacyTuple = (legacy_map, None, None);
2885 let bytes = bincode::serialize(&legacy_data).unwrap();
2886
2887 let inner = storage.inner.read().await;
2888 let path = inner.stake_table_dir_path();
2889 drop(inner);
2890 fs::create_dir_all(&path).unwrap();
2891 fs::write(path.join("1.txt"), &bytes).unwrap();
2892
2893 let json_validator = RegisteredValidatorNoX25519 {
2895 account: addr,
2896 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2897 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2898 stake: U256::from(2000),
2899 commission: 200,
2900 delegators: HashMap::new(),
2901 authenticated: true,
2902 };
2903 let mut json_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
2904 json_map.insert(addr, json_validator);
2905 let validators_dir = path.join("validators");
2906 fs::create_dir_all(&validators_dir).unwrap();
2907 let json_path = validators_dir.join("epoch_1.json");
2908 fs::write(&json_path, serde_json::to_string_pretty(&json_map).unwrap()).unwrap();
2909
2910 storage.migrate_x25519_keys().await.unwrap();
2912
2913 let result = storage.load_stake(EpochNumber::new(1)).await.unwrap();
2915 assert!(result.is_some());
2916 let (validators, reward, hash) = result.unwrap();
2917 assert_eq!(validators.len(), 1);
2918 let v = validators.get(&addr).unwrap();
2919 assert_eq!(v.stake, U256::from(1000));
2920 assert!(v.x25519_key.is_none());
2921 assert!(v.p2p_addr.is_none());
2922 assert!(reward.is_none());
2923 assert!(hash.is_none());
2924
2925 let json_content = fs::read_to_string(&json_path).unwrap();
2927 let parsed: espresso_types::RegisteredValidatorMap =
2928 serde_json::from_str(&json_content).unwrap();
2929 assert_eq!(parsed.len(), 1);
2930 let json_v = parsed.get(&addr).unwrap();
2931 assert_eq!(json_v.stake, U256::from(2000));
2932 assert!(json_v.x25519_key.is_none());
2933
2934 storage.migrate_x25519_keys().await.unwrap();
2936 let result2 = storage.load_stake(EpochNumber::new(1)).await;
2937 assert!(result2.is_ok());
2938 }
2939
2940 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2941 async fn test_migrate_x25519_keys_no_stake_dir() {
2942 let tmp = Persistence::tmp_storage().await;
2943 let mut opt = Persistence::options(&tmp);
2944 let storage = opt.create().await.unwrap();
2945
2946 storage.migrate_x25519_keys().await.unwrap();
2948
2949 let inner = storage.inner.read().await;
2951 let path = inner.stake_table_dir_path();
2952 drop(inner);
2953 fs::create_dir_all(&path).unwrap();
2954 fs::write(path.join("1.txt"), b"garbage").unwrap();
2955
2956 storage.migrate_x25519_keys().await.unwrap();
2958 }
2959
2960 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2961 async fn test_store_all_validators_authenticated_and_unauthenticated() {
2962 use std::collections::HashMap;
2963
2964 use alloy::primitives::{Address, U256};
2965 use espresso_types::v0_3::RegisteredValidator;
2966 use indexmap::IndexMap;
2967
2968 let tmp = Persistence::tmp_storage().await;
2969 let mut opt = Persistence::options(&tmp);
2970 let storage = opt.create().await.unwrap();
2971
2972 let authenticated_validator = RegisteredValidator {
2974 account: Address::random(),
2975 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
2976 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2977 stake: U256::from(1000),
2978 commission: 100,
2979 delegators: HashMap::new(),
2980 authenticated: true,
2981 x25519_key: None,
2982 p2p_addr: None,
2983 };
2984
2985 let unauthenticated_validator = RegisteredValidator {
2987 account: Address::random(),
2988 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
2989 state_ver_key: hotshot_types::light_client::StateVerKey::default(),
2990 stake: U256::from(2000),
2991 commission: 200,
2992 delegators: HashMap::new(),
2993 authenticated: false,
2994 x25519_key: None,
2995 p2p_addr: None,
2996 };
2997
2998 let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
2999 validators.insert(
3000 authenticated_validator.account,
3001 authenticated_validator.clone(),
3002 );
3003 validators.insert(
3004 unauthenticated_validator.account,
3005 unauthenticated_validator.clone(),
3006 );
3007
3008 storage
3010 .store_all_validators(EpochNumber::new(1), validators)
3011 .await
3012 .unwrap();
3013
3014 let loaded = storage
3016 .load_all_validators(EpochNumber::new(1), 0, 100)
3017 .await
3018 .unwrap();
3019 assert_eq!(loaded.len(), 2);
3020
3021 let loaded_auth = loaded
3023 .iter()
3024 .find(|v| v.account == authenticated_validator.account)
3025 .unwrap();
3026 assert!(
3027 loaded_auth.authenticated,
3028 "authenticated validator should remain authenticated"
3029 );
3030
3031 let loaded_unauth = loaded
3032 .iter()
3033 .find(|v| v.account == unauthenticated_validator.account)
3034 .unwrap();
3035 assert!(
3036 !loaded_unauth.authenticated,
3037 "unauthenticated validator should remain unauthenticated"
3038 );
3039 }
3040}