hotshot_types/traits/
storage.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Abstract storage type for storing DA proposals and VID shares
8//!
9//! This modules provides the [`Storage`] trait.
10//!
11
12use std::{sync::Arc, time::Duration};
13
14use anyhow::{Result, anyhow, ensure};
15use async_trait::async_trait;
16use futures::future::BoxFuture;
17use tokio::time::sleep;
18
19use super::node_implementation::NodeType;
20use crate::{
21    data::{
22        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
23        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
24    },
25    drb::{DrbInput, DrbResult},
26    event::HotShotAction,
27    message::{Proposal, convert_proposal},
28    simple_certificate::{
29        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
30        QuorumCertificate2, UpgradeCertificate,
31    },
32};
33
34/// Abstraction for storing a variety of consensus payload datum.
35#[async_trait]
36pub trait Storage<TYPES: NodeType>: Send + Sync + Clone + 'static {
37    /// Add a proposal to the stored VID proposals.
38    async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()>;
39
40    /// Add a proposal to the stored DA proposals.
41    async fn append_da(
42        &self,
43        proposal: &Proposal<TYPES, DaProposal<TYPES>>,
44        vid_commit: VidCommitment,
45    ) -> Result<()>;
46    /// Add a proposal to the stored DA proposals.
47    async fn append_da2(
48        &self,
49        proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
50        vid_commit: VidCommitment,
51    ) -> Result<()> {
52        self.append_da(&convert_proposal(proposal.clone()), vid_commit)
53            .await
54    }
55    /// Add a proposal we sent to the store
56    async fn append_proposal(
57        &self,
58        proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
59    ) -> Result<()>;
60    /// Add a proposal we sent to the store
61    async fn append_proposal2(
62        &self,
63        proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
64    ) -> Result<()>;
65    /// Add a proposal we sent to the store
66    async fn append_proposal_wrapper(
67        &self,
68        proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
69    ) -> Result<()> {
70        self.append_proposal2(&convert_proposal(proposal.clone()))
71            .await
72    }
73    /// Record a HotShotAction taken.
74    async fn record_action(
75        &self,
76        view: ViewNumber,
77        epoch: Option<EpochNumber>,
78        action: HotShotAction,
79    ) -> Result<()>;
80    /// Update the current high QC in storage.
81    async fn update_high_qc(&self, high_qc: QuorumCertificate<TYPES>) -> Result<()>;
82    /// Update the current high QC in storage.
83    async fn update_high_qc2(&self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
84        self.update_high_qc(high_qc.to_qc()).await
85    }
86    /// Update the light client state update certificate in storage.
87    async fn update_state_cert(
88        &self,
89        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
90    ) -> Result<()>;
91
92    async fn update_high_qc2_and_state_cert(
93        &self,
94        high_qc: QuorumCertificate2<TYPES>,
95        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
96    ) -> Result<()> {
97        self.update_high_qc2(high_qc).await?;
98        self.update_state_cert(state_cert).await
99    }
100    /// Update the current high QC in storage.
101    async fn update_next_epoch_high_qc2(
102        &self,
103        _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
104    ) -> Result<()>;
105
106    /// Update the current eQC in storage.
107    async fn update_eqc(
108        &self,
109        _high_qc: QuorumCertificate2<TYPES>,
110        _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
111    ) -> Result<()>;
112
113    /// Upgrade the current decided upgrade certificate in storage.
114    async fn update_decided_upgrade_certificate(
115        &self,
116        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
117    ) -> Result<()>;
118    /// Migrate leaves from `Leaf` to `Leaf2`, and proposals from `QuorumProposal` to `QuorumProposal2`
119    async fn migrate_storage(&self) -> Result<()> {
120        Ok(())
121    }
122    /// Add a drb result
123    async fn store_drb_result(&self, epoch: EpochNumber, drb_result: DrbResult) -> Result<()>;
124    /// Add an epoch block header
125    async fn store_epoch_root(
126        &self,
127        epoch: EpochNumber,
128        block_header: TYPES::BlockHeader,
129    ) -> Result<()>;
130    async fn load_drb_result(&self, epoch: EpochNumber) -> Result<DrbResult> {
131        match self.load_drb_input(*epoch).await {
132            Ok(drb_input) => {
133                ensure!(drb_input.iteration == drb_input.difficulty_level);
134
135                Ok(drb_input.value)
136            },
137            Err(e) => Err(e),
138        }
139    }
140    async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()>;
141    async fn load_drb_input(&self, _epoch: u64) -> Result<DrbInput>;
142}
143
144pub async fn load_drb_input_impl<TYPES: NodeType>(
145    storage: impl Storage<TYPES>,
146    epoch: u64,
147) -> Result<DrbInput> {
148    storage.load_drb_input(epoch).await
149}
150
151pub type LoadDrbProgressFn =
152    std::sync::Arc<dyn Fn(u64) -> BoxFuture<'static, Result<DrbInput>> + Send + Sync>;
153
154pub fn load_drb_progress_fn<TYPES: NodeType>(
155    storage: impl Storage<TYPES> + 'static,
156) -> LoadDrbProgressFn {
157    Arc::new(move |epoch| {
158        let storage = storage.clone();
159        Box::pin(load_drb_input_impl(storage, epoch))
160    })
161}
162
163pub fn null_load_drb_progress_fn() -> LoadDrbProgressFn {
164    Arc::new(move |_drb_input| {
165        Box::pin(async { Err(anyhow!("Using null implementation of load_drb_input")) })
166    })
167}
168
169pub async fn store_drb_input_impl<TYPES: NodeType>(
170    storage: impl Storage<TYPES>,
171    drb_input: DrbInput,
172) -> Result<()> {
173    for attempt in 1..=3 {
174        match storage.store_drb_input(drb_input.clone()).await {
175            Ok(()) => return Ok(()),
176            Err(e) if attempt < 3 => {
177                tracing::warn!("Failed to store DRB input (attempt {attempt}/3): {e}");
178                sleep(Duration::from_millis(300)).await;
179            },
180            Err(e) => {
181                tracing::warn!("Failed to store DRB input (attempt {attempt}/3): {e}");
182                return Err(e);
183            },
184        }
185    }
186
187    Ok(())
188}
189
190pub type StoreDrbProgressFn =
191    std::sync::Arc<dyn Fn(DrbInput) -> BoxFuture<'static, Result<()>> + Send + Sync>;
192
193pub fn store_drb_progress_fn<TYPES: NodeType>(
194    storage: impl Storage<TYPES> + 'static,
195) -> StoreDrbProgressFn {
196    Arc::new(move |drb_input| {
197        let storage = storage.clone();
198        Box::pin(store_drb_input_impl(storage, drb_input))
199    })
200}
201
202pub fn null_store_drb_progress_fn() -> StoreDrbProgressFn {
203    Arc::new(move |_drb_input| Box::pin(async { Ok(()) }))
204}
205
206pub type StoreDrbResultFn = Arc<
207    Box<dyn Fn(EpochNumber, DrbResult) -> BoxFuture<'static, Result<()>> + Send + Sync + 'static>,
208>;
209
210async fn store_drb_result_impl<TYPES: NodeType>(
211    storage: impl Storage<TYPES>,
212    epoch: EpochNumber,
213    drb_result: DrbResult,
214) -> Result<()> {
215    for attempt in 1..=3 {
216        match storage.store_drb_result(epoch, drb_result).await {
217            Ok(()) => return Ok(()),
218            Err(e) if attempt < 3 => {
219                tracing::warn!(
220                    "Failed to store DRB result for epoch {epoch} (attempt {attempt}/3): {e}"
221                );
222                sleep(Duration::from_millis(300)).await;
223            },
224            Err(e) => {
225                tracing::warn!(
226                    "Failed to store DRB result for epoch {epoch} (attempt {attempt}/3): {e}"
227                );
228                return Err(e);
229            },
230        }
231    }
232    Ok(())
233}
234
235/// Helper function to create a callback to add a drb result to storage
236pub fn store_drb_result_fn<TYPES: NodeType>(
237    storage: impl Storage<TYPES> + 'static,
238) -> StoreDrbResultFn {
239    Arc::new(Box::new(move |epoch, drb_result| {
240        let st = storage.clone();
241        Box::pin(store_drb_result_impl(st, epoch, drb_result))
242    }))
243}