1use std::{collections::BTreeMap, marker::PhantomData, time::Duration};
2
3use async_trait::async_trait;
4use hotshot::{traits::BlockPayload, types::SignatureKey};
5use hotshot_example_types::storage_types::TestStorage;
6use hotshot_types::{
7 data::{
8 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
9 VidDisperseShare, VidDisperseShare2, ViewChangeEvidence2, ViewNumber,
10 },
11 message::Proposal as SignedProposal,
12 simple_certificate::LightClientStateUpdateCertificateV2,
13 traits::{EncodeBytes, node_implementation::NodeType, storage::Storage as StorageTrait},
14 utils::EpochTransitionIndicator,
15};
16use tokio::{spawn, task::JoinHandle, time::sleep};
17use tracing::{error, warn};
18
19use crate::message::{Certificate2, Proposal};
20
21const RETRY_DELAY: Duration = Duration::from_millis(300);
22
23#[async_trait]
25pub trait NewProtocolStorage<T: NodeType>: StorageTrait<T> {
26 async fn append_cert2(&self, view: ViewNumber, cert: Certificate2<T>) -> anyhow::Result<()>;
27}
28
29pub struct Storage<T: NodeType, S> {
30 storage: S,
31 private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
32 handles: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
33}
34
35impl<T: NodeType, S: NewProtocolStorage<T>> Storage<T, S> {
36 pub fn new(storage: S, private_key: <T::SignatureKey as SignatureKey>::PrivateKey) -> Self {
37 Self {
38 storage,
39 private_key,
40 handles: BTreeMap::new(),
41 }
42 }
43
44 pub fn append_vid(&mut self, vid_share: VidDisperseShare2<T>) {
45 let view = vid_share.view_number;
46 let storage = self.storage.clone();
47 let private_key = self.private_key.clone();
48 let handle = spawn(async move {
49 let share: VidDisperseShare<T> = VidDisperseShare::V2(vid_share);
50 let Some(proposal) = share.to_proposal(&private_key) else {
51 error!("failed to sign VID share for storage");
52 return;
53 };
54 loop {
55 match storage.append_vid(&proposal).await {
56 Ok(()) => return,
57 Err(err) => {
58 warn!(%err, "failed to append VID share, retrying");
59 sleep(RETRY_DELAY).await;
60 },
61 }
62 }
63 });
64 self.handles.entry(view).or_default().push(handle);
65 }
66
67 pub fn append_da(
68 &mut self,
69 view_number: ViewNumber,
70 epoch: EpochNumber,
71 block_payload: T::BlockPayload,
72 metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
73 vid_commit: VidCommitment,
74 ) {
75 let storage = self.storage.clone();
76 let private_key = self.private_key.clone();
77 let handle = spawn(async move {
78 let data = DaProposal2 {
79 encoded_transactions: block_payload.encode(),
80 metadata,
81 view_number,
82 epoch: Some(epoch),
83 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
84 };
85 let Ok(signature) = T::SignatureKey::sign(&private_key, &[]) else {
86 error!("failed to sign DA proposal for storage");
87 return;
88 };
89 let proposal = SignedProposal {
90 data,
91 signature,
92 _pd: PhantomData,
93 };
94 loop {
95 match storage.append_da2(&proposal, vid_commit).await {
96 Ok(()) => return,
97 Err(err) => {
98 warn!(%err, "failed to append DA proposal, retrying");
99 sleep(RETRY_DELAY).await;
100 },
101 }
102 }
103 });
104 self.handles.entry(view_number).or_default().push(handle);
105 }
106
107 pub fn append_cert2(&mut self, view: ViewNumber, cert2: Certificate2<T>) {
108 let storage = self.storage.clone();
109 let handle = spawn(async move {
110 loop {
111 match storage.append_cert2(view, cert2.clone()).await {
112 Ok(()) => return,
113 Err(err) => {
114 warn!(%err, %view, "failed to append cert2, retrying");
115 sleep(RETRY_DELAY).await;
116 },
117 }
118 }
119 });
120 self.handles.entry(view).or_default().push(handle);
121 }
122
123 pub fn append_state_cert(
124 &mut self,
125 view: ViewNumber,
126 state_cert: LightClientStateUpdateCertificateV2<T>,
127 ) {
128 let storage = self.storage.clone();
129 let handle = spawn(async move {
130 loop {
131 match storage.update_state_cert(state_cert.clone()).await {
132 Ok(()) => return,
133 Err(err) => {
134 warn!(%err, epoch = %state_cert.epoch, "failed to append state cert, retrying");
135 sleep(RETRY_DELAY).await;
136 },
137 }
138 }
139 });
140 self.handles.entry(view).or_default().push(handle);
141 }
142
143 pub fn append_proposal(&mut self, proposal: Proposal<T>) {
144 let view = proposal.view_number;
145 let storage = self.storage.clone();
146 let private_key = self.private_key.clone();
147 let handle = spawn(async move {
148 let data = QuorumProposalWrapper {
149 proposal: QuorumProposal2 {
150 block_header: proposal.block_header,
151 view_number: proposal.view_number,
152 epoch: Some(proposal.epoch),
153 justify_qc: proposal.justify_qc,
154 next_epoch_justify_qc: None,
155 upgrade_certificate: proposal.upgrade_certificate,
156 view_change_evidence: proposal
157 .view_change_evidence
158 .map(ViewChangeEvidence2::Timeout),
159 next_drb_result: proposal.next_drb_result,
160 state_cert: proposal.state_cert,
161 },
162 };
163 let Ok(signature) = T::SignatureKey::sign(&private_key, &[]) else {
164 error!("failed to sign quorum proposal for storage");
165 return;
166 };
167 let signed = SignedProposal {
168 data,
169 signature,
170 _pd: PhantomData,
171 };
172 loop {
173 match storage.append_proposal_wrapper(&signed).await {
174 Ok(()) => return,
175 Err(err) => {
176 warn!(%err, "failed to append proposal, retrying");
177 sleep(RETRY_DELAY).await;
178 },
179 }
180 }
181 });
182 self.handles.entry(view).or_default().push(handle);
183 }
184
185 pub fn gc(&mut self, view_number: ViewNumber) {
186 let keep = self.handles.split_off(&view_number);
187 for handles in self.handles.values() {
188 for handle in handles {
189 handle.abort();
190 }
191 }
192 self.handles = keep;
193 }
194}
195
196#[async_trait]
197impl<T: NodeType> NewProtocolStorage<T> for TestStorage<T> {
198 async fn append_cert2(&self, _view: ViewNumber, _cert: Certificate2<T>) -> anyhow::Result<()> {
199 Ok(())
200 }
201}