Skip to main content

hotshot_new_protocol/
vid.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2
3use committable::Commitment;
4use hotshot::traits::BlockPayload;
5use hotshot_types::{
6    data::{EpochNumber, VidCommitment2, VidDisperse2, VidDisperseShare2, ViewNumber},
7    epoch_membership::EpochMembershipCoordinator,
8    traits::node_implementation::NodeType,
9    vid::avidm_gf2::{AvidmGf2Common, AvidmGf2Scheme, AvidmGf2Share},
10};
11use tokio::task::{AbortHandle, JoinSet};
12
13pub struct VidDisperseOutput<T: NodeType> {
14    pub view: ViewNumber,
15    pub payload_commitment: VidCommitment2,
16    pub disperse: VidDisperse2<T>,
17}
18
19pub struct VidReconstructOutput<T: NodeType> {
20    pub view: ViewNumber,
21    pub epoch: EpochNumber,
22    pub payload_commitment: VidCommitment2,
23    pub payload: T::BlockPayload,
24    pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
25    pub tx_commitments: Vec<Commitment<T::Transaction>>,
26}
27
28#[derive(Clone, Eq, PartialEq, Debug)]
29pub struct VidDisperseRequest<T: NodeType> {
30    pub view: ViewNumber,
31    pub epoch: EpochNumber,
32    pub block: T::BlockPayload,
33    pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
34}
35
36pub struct VidDisperser<T: NodeType> {
37    calculations: BTreeMap<ViewNumber, AbortHandle>,
38    epoch_membership_coordinator: EpochMembershipCoordinator<T>,
39    tasks: JoinSet<Result<VidDisperseOutput<T>, ()>>,
40}
41
42impl<T: NodeType> VidDisperser<T> {
43    pub fn new(epoch_membership_coordinator: EpochMembershipCoordinator<T>) -> Self {
44        Self {
45            calculations: BTreeMap::new(),
46            epoch_membership_coordinator,
47            tasks: JoinSet::new(),
48        }
49    }
50
51    pub fn request_vid_disperse(&mut self, vid_disperse_request: VidDisperseRequest<T>) {
52        let view = vid_disperse_request.view;
53        if self.calculations.contains_key(&view) {
54            return;
55        }
56        let handle = self.tasks.spawn(Self::handle_vid_disperse_request(
57            self.epoch_membership_coordinator.clone(),
58            vid_disperse_request,
59        ));
60        self.calculations.insert(view, handle);
61    }
62
63    pub async fn next(&mut self) -> Option<Result<VidDisperseOutput<T>, ()>> {
64        loop {
65            match self.tasks.join_next().await {
66                Some(Ok(result)) => return Some(result),
67                Some(Err(_)) => continue,
68                None => return None,
69            }
70        }
71    }
72
73    async fn handle_vid_disperse_request(
74        epoch_membership_coordinator: EpochMembershipCoordinator<T>,
75        vid_disperse_request: VidDisperseRequest<T>,
76    ) -> Result<VidDisperseOutput<T>, ()> {
77        let Ok((disperse, _duration)) = VidDisperse2::calculate_vid_disperse(
78            &vid_disperse_request.block,
79            &epoch_membership_coordinator,
80            vid_disperse_request.view,
81            Some(vid_disperse_request.epoch),
82            Some(vid_disperse_request.epoch),
83            &vid_disperse_request.metadata,
84        )
85        .await
86        else {
87            // TODO: Handle error
88            return Err(());
89        };
90        Ok(VidDisperseOutput {
91            view: vid_disperse_request.view,
92            payload_commitment: disperse.payload_commitment,
93            disperse,
94        })
95    }
96    pub fn gc(&mut self, view_number: ViewNumber) {
97        let keep = self.calculations.split_off(&view_number);
98        for handle in self.calculations.values_mut() {
99            handle.abort();
100        }
101        self.calculations = keep;
102    }
103}
104
105pub(crate) struct VidShareAccumulator<T: NodeType> {
106    shares: Vec<AvidmGf2Share>,
107    accumulated_weight: usize,
108    seen_keys: HashSet<T::SignatureKey>,
109    common: AvidmGf2Common,
110    metadata: Option<<T::BlockPayload as BlockPayload<T>>::Metadata>,
111    epoch: Option<EpochNumber>,
112}
113
114impl<T: NodeType> VidShareAccumulator<T> {
115    fn has_enough_shares(&self) -> bool {
116        self.accumulated_weight >= self.common.param.recovery_threshold
117    }
118}
119
120#[derive(Default)]
121pub struct VidReconstructor<T: NodeType> {
122    accumulators: BTreeMap<ViewNumber, VidShareAccumulator<T>>,
123    reconstructed: BTreeSet<ViewNumber>,
124    tasks: JoinSet<Result<VidReconstructOutput<T>, ()>>,
125    calculations: BTreeMap<ViewNumber, AbortHandle>,
126}
127
128impl<T: NodeType> VidReconstructor<T> {
129    pub fn new() -> Self {
130        Self {
131            accumulators: BTreeMap::new(),
132            reconstructed: BTreeSet::new(),
133            tasks: JoinSet::new(),
134            calculations: BTreeMap::new(),
135        }
136    }
137
138    pub(crate) fn handle_vid_share<M>(&mut self, share: VidDisperseShare2<T>, metadata: M)
139    where
140        M: Into<Option<<T::BlockPayload as BlockPayload<T>>::Metadata>>,
141    {
142        let view = share.view_number;
143        if self.reconstructed.contains(&view) {
144            return;
145        }
146        let payload_commitment = share.payload_commitment;
147        let recipient_key = share.recipient_key.clone();
148        let weight = share.share.weight();
149        let metadata = metadata.into();
150        let share_epoch = share.epoch;
151        let accumulator = self
152            .accumulators
153            .entry(view)
154            .or_insert_with(|| VidShareAccumulator {
155                shares: Vec::new(),
156                accumulated_weight: 0,
157                seen_keys: HashSet::new(),
158                common: share.common.clone(),
159                metadata: None,
160                epoch: share_epoch,
161            });
162        if accumulator.metadata.is_none()
163            && let Some(m) = metadata
164        {
165            accumulator.metadata = Some(m)
166        }
167        if accumulator.seen_keys.insert(recipient_key) {
168            accumulator.accumulated_weight += weight;
169            accumulator.shares.push(share.share);
170        }
171        if accumulator.has_enough_shares() {
172            self.try_reconstruct(view, payload_commitment);
173        }
174    }
175
176    pub async fn next(&mut self) -> Option<Result<VidReconstructOutput<T>, ()>> {
177        loop {
178            match self.tasks.join_next().await {
179                Some(Ok(Ok(out))) => {
180                    self.calculations.remove(&out.view);
181                    self.accumulators.remove(&out.view);
182                    self.reconstructed.insert(out.view);
183                    return Some(Ok(out));
184                },
185                Some(Ok(Err(()))) => {
186                    // TODO: Handle error
187                    return Some(Err(()));
188                },
189                Some(Err(_)) => continue,
190                None => return None,
191            }
192        }
193    }
194
195    fn try_reconstruct(&mut self, view: ViewNumber, payload_commitment: VidCommitment2) {
196        if self.calculations.contains_key(&view) {
197            return;
198        }
199        let Some(accumulator) = self.accumulators.get(&view) else {
200            return;
201        };
202        let shares = accumulator.shares.clone();
203        let common = accumulator.common.clone();
204        // Metadata comes from when we get the proposal, otherwise we can't reconstruct the payload
205        let Some(metadata) = accumulator.metadata.clone() else {
206            return;
207        };
208        let epoch = accumulator.epoch.unwrap_or(EpochNumber::genesis());
209        let task = self.tasks.spawn_blocking(move || {
210            let Ok(result) = AvidmGf2Scheme::recover(&common, &shares) else {
211                // TODO: Handle error
212                return Err(());
213            };
214            let payload = T::BlockPayload::from_bytes(&result, &metadata);
215            let tx_commitments = payload.transaction_commitments(&metadata);
216            Ok(VidReconstructOutput {
217                view,
218                epoch,
219                payload_commitment,
220                payload,
221                metadata,
222                tx_commitments,
223            })
224        });
225        self.calculations.insert(view, task);
226    }
227
228    pub fn gc(&mut self, view_number: ViewNumber) {
229        let keep = self.calculations.split_off(&view_number);
230        for handle in self.calculations.values_mut() {
231            handle.abort();
232        }
233        self.calculations = keep;
234        self.accumulators = self.accumulators.split_off(&view_number);
235    }
236
237    /// Mark `view` as already-reconstructed: drop accumulated shares, abort any
238    /// in-flight reconstruction task, and ignore later shares for this view.
239    pub fn mark_reconstructed(&mut self, view: ViewNumber) {
240        self.reconstructed.insert(view);
241        self.accumulators.remove(&view);
242        if let Some(handle) = self.calculations.remove(&view) {
243            handle.abort();
244        }
245    }
246}