Skip to main content

hotshot_new_protocol/
storage.rs

1use std::{collections::BTreeMap, marker::PhantomData, time::Duration};
2
3use async_trait::async_trait;
4use hotshot::{traits::BlockPayload, types::SignatureKey};
5use hotshot_example_types::storage_types::TestStorage;
6use hotshot_types::{
7    data::{
8        DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
9        VidDisperseShare, VidDisperseShare2, ViewChangeEvidence2, ViewNumber,
10    },
11    message::Proposal as SignedProposal,
12    simple_certificate::LightClientStateUpdateCertificateV2,
13    traits::{EncodeBytes, node_implementation::NodeType, storage::Storage as StorageTrait},
14    utils::EpochTransitionIndicator,
15};
16use tokio::{spawn, task::JoinHandle, time::sleep};
17use tracing::{error, warn};
18
19use crate::message::{Certificate2, Proposal};
20
21const RETRY_DELAY: Duration = Duration::from_millis(300);
22
23/// New protocol storage extension for data that is not part of the legacy HotShot storage trait.
24#[async_trait]
25pub trait NewProtocolStorage<T: NodeType>: StorageTrait<T> {
26    async fn append_cert2(&self, view: ViewNumber, cert: Certificate2<T>) -> anyhow::Result<()>;
27}
28
29pub struct Storage<T: NodeType, S> {
30    storage: S,
31    private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
32    handles: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
33}
34
35impl<T: NodeType, S: NewProtocolStorage<T>> Storage<T, S> {
36    pub fn new(storage: S, private_key: <T::SignatureKey as SignatureKey>::PrivateKey) -> Self {
37        Self {
38            storage,
39            private_key,
40            handles: BTreeMap::new(),
41        }
42    }
43
44    pub fn append_vid(&mut self, vid_share: VidDisperseShare2<T>) {
45        let view = vid_share.view_number;
46        let storage = self.storage.clone();
47        let private_key = self.private_key.clone();
48        let handle = spawn(async move {
49            let share: VidDisperseShare<T> = VidDisperseShare::V2(vid_share);
50            let Some(proposal) = share.to_proposal(&private_key) else {
51                error!("failed to sign VID share for storage");
52                return;
53            };
54            loop {
55                match storage.append_vid(&proposal).await {
56                    Ok(()) => return,
57                    Err(err) => {
58                        warn!(%err, "failed to append VID share, retrying");
59                        sleep(RETRY_DELAY).await;
60                    },
61                }
62            }
63        });
64        self.handles.entry(view).or_default().push(handle);
65    }
66
67    pub fn append_da(
68        &mut self,
69        view_number: ViewNumber,
70        epoch: EpochNumber,
71        block_payload: T::BlockPayload,
72        metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
73        vid_commit: VidCommitment,
74    ) {
75        let storage = self.storage.clone();
76        let private_key = self.private_key.clone();
77        let handle = spawn(async move {
78            let data = DaProposal2 {
79                encoded_transactions: block_payload.encode(),
80                metadata,
81                view_number,
82                epoch: Some(epoch),
83                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
84            };
85            let Ok(signature) = T::SignatureKey::sign(&private_key, &[]) else {
86                error!("failed to sign DA proposal for storage");
87                return;
88            };
89            let proposal = SignedProposal {
90                data,
91                signature,
92                _pd: PhantomData,
93            };
94            loop {
95                match storage.append_da2(&proposal, vid_commit).await {
96                    Ok(()) => return,
97                    Err(err) => {
98                        warn!(%err, "failed to append DA proposal, retrying");
99                        sleep(RETRY_DELAY).await;
100                    },
101                }
102            }
103        });
104        self.handles.entry(view_number).or_default().push(handle);
105    }
106
107    pub fn append_cert2(&mut self, view: ViewNumber, cert2: Certificate2<T>) {
108        let storage = self.storage.clone();
109        let handle = spawn(async move {
110            loop {
111                match storage.append_cert2(view, cert2.clone()).await {
112                    Ok(()) => return,
113                    Err(err) => {
114                        warn!(%err, %view, "failed to append cert2, retrying");
115                        sleep(RETRY_DELAY).await;
116                    },
117                }
118            }
119        });
120        self.handles.entry(view).or_default().push(handle);
121    }
122
123    pub fn append_state_cert(
124        &mut self,
125        view: ViewNumber,
126        state_cert: LightClientStateUpdateCertificateV2<T>,
127    ) {
128        let storage = self.storage.clone();
129        let handle = spawn(async move {
130            loop {
131                match storage.update_state_cert(state_cert.clone()).await {
132                    Ok(()) => return,
133                    Err(err) => {
134                        warn!(%err, epoch = %state_cert.epoch, "failed to append state cert, retrying");
135                        sleep(RETRY_DELAY).await;
136                    },
137                }
138            }
139        });
140        self.handles.entry(view).or_default().push(handle);
141    }
142
143    pub fn append_proposal(&mut self, proposal: Proposal<T>) {
144        let view = proposal.view_number;
145        let storage = self.storage.clone();
146        let private_key = self.private_key.clone();
147        let handle = spawn(async move {
148            let data = QuorumProposalWrapper {
149                proposal: QuorumProposal2 {
150                    block_header: proposal.block_header,
151                    view_number: proposal.view_number,
152                    epoch: Some(proposal.epoch),
153                    justify_qc: proposal.justify_qc,
154                    next_epoch_justify_qc: None,
155                    upgrade_certificate: proposal.upgrade_certificate,
156                    view_change_evidence: proposal
157                        .view_change_evidence
158                        .map(ViewChangeEvidence2::Timeout),
159                    next_drb_result: proposal.next_drb_result,
160                    state_cert: proposal.state_cert,
161                },
162            };
163            let Ok(signature) = T::SignatureKey::sign(&private_key, &[]) else {
164                error!("failed to sign quorum proposal for storage");
165                return;
166            };
167            let signed = SignedProposal {
168                data,
169                signature,
170                _pd: PhantomData,
171            };
172            loop {
173                match storage.append_proposal_wrapper(&signed).await {
174                    Ok(()) => return,
175                    Err(err) => {
176                        warn!(%err, "failed to append proposal, retrying");
177                        sleep(RETRY_DELAY).await;
178                    },
179                }
180            }
181        });
182        self.handles.entry(view).or_default().push(handle);
183    }
184
185    pub fn gc(&mut self, view_number: ViewNumber) {
186        let keep = self.handles.split_off(&view_number);
187        for handles in self.handles.values() {
188            for handle in handles {
189                handle.abort();
190            }
191        }
192        self.handles = keep;
193    }
194}
195
196#[async_trait]
197impl<T: NodeType> NewProtocolStorage<T> for TestStorage<T> {
198    async fn append_cert2(&self, _view: ViewNumber, _cert: Certificate2<T>) -> anyhow::Result<()> {
199        Ok(())
200    }
201}