1use std::{sync::Arc, time::Duration};
13
14use anyhow::{Result, anyhow, ensure};
15use async_trait::async_trait;
16use futures::future::BoxFuture;
17use tokio::time::sleep;
18
19use super::node_implementation::NodeType;
20use crate::{
21 data::{
22 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
23 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
24 },
25 drb::{DrbInput, DrbResult},
26 event::HotShotAction,
27 message::{Proposal, convert_proposal},
28 simple_certificate::{
29 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
30 QuorumCertificate2, UpgradeCertificate,
31 },
32};
33
34#[async_trait]
36pub trait Storage<TYPES: NodeType>: Send + Sync + Clone + 'static {
37 async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()>;
39
40 async fn append_da(
42 &self,
43 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
44 vid_commit: VidCommitment,
45 ) -> Result<()>;
46 async fn append_da2(
48 &self,
49 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
50 vid_commit: VidCommitment,
51 ) -> Result<()> {
52 self.append_da(&convert_proposal(proposal.clone()), vid_commit)
53 .await
54 }
55 async fn append_proposal(
57 &self,
58 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
59 ) -> Result<()>;
60 async fn append_proposal2(
62 &self,
63 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
64 ) -> Result<()>;
65 async fn append_proposal_wrapper(
67 &self,
68 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
69 ) -> Result<()> {
70 self.append_proposal2(&convert_proposal(proposal.clone()))
71 .await
72 }
73 async fn record_action(
75 &self,
76 view: ViewNumber,
77 epoch: Option<EpochNumber>,
78 action: HotShotAction,
79 ) -> Result<()>;
80 async fn update_high_qc(&self, high_qc: QuorumCertificate<TYPES>) -> Result<()>;
82 async fn update_high_qc2(&self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
84 self.update_high_qc(high_qc.to_qc()).await
85 }
86 async fn update_state_cert(
88 &self,
89 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
90 ) -> Result<()>;
91
92 async fn update_high_qc2_and_state_cert(
93 &self,
94 high_qc: QuorumCertificate2<TYPES>,
95 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
96 ) -> Result<()> {
97 self.update_high_qc2(high_qc).await?;
98 self.update_state_cert(state_cert).await
99 }
100 async fn update_next_epoch_high_qc2(
102 &self,
103 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
104 ) -> Result<()>;
105
106 async fn update_eqc(
108 &self,
109 _high_qc: QuorumCertificate2<TYPES>,
110 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
111 ) -> Result<()>;
112
113 async fn update_decided_upgrade_certificate(
115 &self,
116 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
117 ) -> Result<()>;
118 async fn migrate_storage(&self) -> Result<()> {
120 Ok(())
121 }
122 async fn store_drb_result(&self, epoch: EpochNumber, drb_result: DrbResult) -> Result<()>;
124 async fn store_epoch_root(
126 &self,
127 epoch: EpochNumber,
128 block_header: TYPES::BlockHeader,
129 ) -> Result<()>;
130 async fn load_drb_result(&self, epoch: EpochNumber) -> Result<DrbResult> {
131 match self.load_drb_input(*epoch).await {
132 Ok(drb_input) => {
133 ensure!(drb_input.iteration == drb_input.difficulty_level);
134
135 Ok(drb_input.value)
136 },
137 Err(e) => Err(e),
138 }
139 }
140 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()>;
141 async fn load_drb_input(&self, _epoch: u64) -> Result<DrbInput>;
142}
143
144pub async fn load_drb_input_impl<TYPES: NodeType>(
145 storage: impl Storage<TYPES>,
146 epoch: u64,
147) -> Result<DrbInput> {
148 storage.load_drb_input(epoch).await
149}
150
151pub type LoadDrbProgressFn =
152 std::sync::Arc<dyn Fn(u64) -> BoxFuture<'static, Result<DrbInput>> + Send + Sync>;
153
154pub fn load_drb_progress_fn<TYPES: NodeType>(
155 storage: impl Storage<TYPES> + 'static,
156) -> LoadDrbProgressFn {
157 Arc::new(move |epoch| {
158 let storage = storage.clone();
159 Box::pin(load_drb_input_impl(storage, epoch))
160 })
161}
162
163pub fn null_load_drb_progress_fn() -> LoadDrbProgressFn {
164 Arc::new(move |_drb_input| {
165 Box::pin(async { Err(anyhow!("Using null implementation of load_drb_input")) })
166 })
167}
168
169pub async fn store_drb_input_impl<TYPES: NodeType>(
170 storage: impl Storage<TYPES>,
171 drb_input: DrbInput,
172) -> Result<()> {
173 for attempt in 1..=3 {
174 match storage.store_drb_input(drb_input.clone()).await {
175 Ok(()) => return Ok(()),
176 Err(e) if attempt < 3 => {
177 tracing::warn!("Failed to store DRB input (attempt {attempt}/3): {e}");
178 sleep(Duration::from_millis(300)).await;
179 },
180 Err(e) => {
181 tracing::warn!("Failed to store DRB input (attempt {attempt}/3): {e}");
182 return Err(e);
183 },
184 }
185 }
186
187 Ok(())
188}
189
190pub type StoreDrbProgressFn =
191 std::sync::Arc<dyn Fn(DrbInput) -> BoxFuture<'static, Result<()>> + Send + Sync>;
192
193pub fn store_drb_progress_fn<TYPES: NodeType>(
194 storage: impl Storage<TYPES> + 'static,
195) -> StoreDrbProgressFn {
196 Arc::new(move |drb_input| {
197 let storage = storage.clone();
198 Box::pin(store_drb_input_impl(storage, drb_input))
199 })
200}
201
202pub fn null_store_drb_progress_fn() -> StoreDrbProgressFn {
203 Arc::new(move |_drb_input| Box::pin(async { Ok(()) }))
204}
205
206pub type StoreDrbResultFn = Arc<
207 Box<dyn Fn(EpochNumber, DrbResult) -> BoxFuture<'static, Result<()>> + Send + Sync + 'static>,
208>;
209
210async fn store_drb_result_impl<TYPES: NodeType>(
211 storage: impl Storage<TYPES>,
212 epoch: EpochNumber,
213 drb_result: DrbResult,
214) -> Result<()> {
215 for attempt in 1..=3 {
216 match storage.store_drb_result(epoch, drb_result).await {
217 Ok(()) => return Ok(()),
218 Err(e) if attempt < 3 => {
219 tracing::warn!(
220 "Failed to store DRB result for epoch {epoch} (attempt {attempt}/3): {e}"
221 );
222 sleep(Duration::from_millis(300)).await;
223 },
224 Err(e) => {
225 tracing::warn!(
226 "Failed to store DRB result for epoch {epoch} (attempt {attempt}/3): {e}"
227 );
228 return Err(e);
229 },
230 }
231 }
232 Ok(())
233}
234
235pub fn store_drb_result_fn<TYPES: NodeType>(
237 storage: impl Storage<TYPES> + 'static,
238) -> StoreDrbResultFn {
239 Arc::new(Box::new(move |epoch, drb_result| {
240 let st = storage.clone();
241 Box::pin(store_drb_result_impl(st, epoch, drb_result))
242 }))
243}