1use std::collections::{BTreeMap, BTreeSet, HashSet};
2
3use committable::Commitment;
4use hotshot::traits::BlockPayload;
5use hotshot_types::{
6 data::{EpochNumber, VidCommitment2, VidDisperse2, VidDisperseShare2, ViewNumber},
7 epoch_membership::EpochMembershipCoordinator,
8 traits::node_implementation::NodeType,
9 vid::avidm_gf2::{AvidmGf2Common, AvidmGf2Scheme, AvidmGf2Share},
10};
11use tokio::task::{AbortHandle, JoinSet};
12
13pub struct VidDisperseOutput<T: NodeType> {
14 pub view: ViewNumber,
15 pub payload_commitment: VidCommitment2,
16 pub disperse: VidDisperse2<T>,
17}
18
19pub struct VidReconstructOutput<T: NodeType> {
20 pub view: ViewNumber,
21 pub epoch: EpochNumber,
22 pub payload_commitment: VidCommitment2,
23 pub payload: T::BlockPayload,
24 pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
25 pub tx_commitments: Vec<Commitment<T::Transaction>>,
26}
27
28#[derive(Clone, Eq, PartialEq, Debug)]
29pub struct VidDisperseRequest<T: NodeType> {
30 pub view: ViewNumber,
31 pub epoch: EpochNumber,
32 pub block: T::BlockPayload,
33 pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
34}
35
36pub struct VidDisperser<T: NodeType> {
37 calculations: BTreeMap<ViewNumber, AbortHandle>,
38 epoch_membership_coordinator: EpochMembershipCoordinator<T>,
39 tasks: JoinSet<Result<VidDisperseOutput<T>, ()>>,
40}
41
42impl<T: NodeType> VidDisperser<T> {
43 pub fn new(epoch_membership_coordinator: EpochMembershipCoordinator<T>) -> Self {
44 Self {
45 calculations: BTreeMap::new(),
46 epoch_membership_coordinator,
47 tasks: JoinSet::new(),
48 }
49 }
50
51 pub fn request_vid_disperse(&mut self, vid_disperse_request: VidDisperseRequest<T>) {
52 let view = vid_disperse_request.view;
53 if self.calculations.contains_key(&view) {
54 return;
55 }
56 let handle = self.tasks.spawn(Self::handle_vid_disperse_request(
57 self.epoch_membership_coordinator.clone(),
58 vid_disperse_request,
59 ));
60 self.calculations.insert(view, handle);
61 }
62
63 pub async fn next(&mut self) -> Option<Result<VidDisperseOutput<T>, ()>> {
64 loop {
65 match self.tasks.join_next().await {
66 Some(Ok(result)) => return Some(result),
67 Some(Err(_)) => continue,
68 None => return None,
69 }
70 }
71 }
72
73 async fn handle_vid_disperse_request(
74 epoch_membership_coordinator: EpochMembershipCoordinator<T>,
75 vid_disperse_request: VidDisperseRequest<T>,
76 ) -> Result<VidDisperseOutput<T>, ()> {
77 let Ok((disperse, _duration)) = VidDisperse2::calculate_vid_disperse(
78 &vid_disperse_request.block,
79 &epoch_membership_coordinator,
80 vid_disperse_request.view,
81 Some(vid_disperse_request.epoch),
82 Some(vid_disperse_request.epoch),
83 &vid_disperse_request.metadata,
84 )
85 .await
86 else {
87 return Err(());
89 };
90 Ok(VidDisperseOutput {
91 view: vid_disperse_request.view,
92 payload_commitment: disperse.payload_commitment,
93 disperse,
94 })
95 }
96 pub fn gc(&mut self, view_number: ViewNumber) {
97 let keep = self.calculations.split_off(&view_number);
98 for handle in self.calculations.values_mut() {
99 handle.abort();
100 }
101 self.calculations = keep;
102 }
103}
104
105pub(crate) struct VidShareAccumulator<T: NodeType> {
106 shares: Vec<AvidmGf2Share>,
107 accumulated_weight: usize,
108 seen_keys: HashSet<T::SignatureKey>,
109 common: AvidmGf2Common,
110 metadata: Option<<T::BlockPayload as BlockPayload<T>>::Metadata>,
111 epoch: Option<EpochNumber>,
112}
113
114impl<T: NodeType> VidShareAccumulator<T> {
115 fn has_enough_shares(&self) -> bool {
116 self.accumulated_weight >= self.common.param.recovery_threshold
117 }
118}
119
120#[derive(Default)]
121pub struct VidReconstructor<T: NodeType> {
122 accumulators: BTreeMap<ViewNumber, VidShareAccumulator<T>>,
123 reconstructed: BTreeSet<ViewNumber>,
124 tasks: JoinSet<Result<VidReconstructOutput<T>, ()>>,
125 calculations: BTreeMap<ViewNumber, AbortHandle>,
126}
127
128impl<T: NodeType> VidReconstructor<T> {
129 pub fn new() -> Self {
130 Self {
131 accumulators: BTreeMap::new(),
132 reconstructed: BTreeSet::new(),
133 tasks: JoinSet::new(),
134 calculations: BTreeMap::new(),
135 }
136 }
137
138 pub(crate) fn handle_vid_share<M>(&mut self, share: VidDisperseShare2<T>, metadata: M)
139 where
140 M: Into<Option<<T::BlockPayload as BlockPayload<T>>::Metadata>>,
141 {
142 let view = share.view_number;
143 if self.reconstructed.contains(&view) {
144 return;
145 }
146 let payload_commitment = share.payload_commitment;
147 let recipient_key = share.recipient_key.clone();
148 let weight = share.share.weight();
149 let metadata = metadata.into();
150 let share_epoch = share.epoch;
151 let accumulator = self
152 .accumulators
153 .entry(view)
154 .or_insert_with(|| VidShareAccumulator {
155 shares: Vec::new(),
156 accumulated_weight: 0,
157 seen_keys: HashSet::new(),
158 common: share.common.clone(),
159 metadata: None,
160 epoch: share_epoch,
161 });
162 if accumulator.metadata.is_none()
163 && let Some(m) = metadata
164 {
165 accumulator.metadata = Some(m)
166 }
167 if accumulator.seen_keys.insert(recipient_key) {
168 accumulator.accumulated_weight += weight;
169 accumulator.shares.push(share.share);
170 }
171 if accumulator.has_enough_shares() {
172 self.try_reconstruct(view, payload_commitment);
173 }
174 }
175
176 pub async fn next(&mut self) -> Option<Result<VidReconstructOutput<T>, ()>> {
177 loop {
178 match self.tasks.join_next().await {
179 Some(Ok(Ok(out))) => {
180 self.calculations.remove(&out.view);
181 self.accumulators.remove(&out.view);
182 self.reconstructed.insert(out.view);
183 return Some(Ok(out));
184 },
185 Some(Ok(Err(()))) => {
186 return Some(Err(()));
188 },
189 Some(Err(_)) => continue,
190 None => return None,
191 }
192 }
193 }
194
195 fn try_reconstruct(&mut self, view: ViewNumber, payload_commitment: VidCommitment2) {
196 if self.calculations.contains_key(&view) {
197 return;
198 }
199 let Some(accumulator) = self.accumulators.get(&view) else {
200 return;
201 };
202 let shares = accumulator.shares.clone();
203 let common = accumulator.common.clone();
204 let Some(metadata) = accumulator.metadata.clone() else {
206 return;
207 };
208 let epoch = accumulator.epoch.unwrap_or(EpochNumber::genesis());
209 let task = self.tasks.spawn_blocking(move || {
210 let Ok(result) = AvidmGf2Scheme::recover(&common, &shares) else {
211 return Err(());
213 };
214 let payload = T::BlockPayload::from_bytes(&result, &metadata);
215 let tx_commitments = payload.transaction_commitments(&metadata);
216 Ok(VidReconstructOutput {
217 view,
218 epoch,
219 payload_commitment,
220 payload,
221 metadata,
222 tx_commitments,
223 })
224 });
225 self.calculations.insert(view, task);
226 }
227
228 pub fn gc(&mut self, view_number: ViewNumber) {
229 let keep = self.calculations.split_off(&view_number);
230 for handle in self.calculations.values_mut() {
231 handle.abort();
232 }
233 self.calculations = keep;
234 self.accumulators = self.accumulators.split_off(&view_number);
235 }
236
237 pub fn mark_reconstructed(&mut self, view: ViewNumber) {
240 self.reconstructed.insert(view);
241 self.accumulators.remove(&view);
242 if let Some(handle) = self.calculations.remove(&view) {
243 handle.abort();
244 }
245 }
246}