Skip to main content

hotshot_task_impls/
request.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, BTreeSet},
9    sync::{
10        Arc,
11        atomic::{AtomicBool, Ordering},
12    },
13    time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20    consensus::OuterConsensus,
21    data::{EpochNumber, ViewNumber},
22    epoch_membership::EpochMembershipCoordinator,
23    simple_vote::HasEpoch,
24    traits::{
25        block_contents::BlockHeader,
26        network::{ConnectedNetwork, DataRequest, RequestKind},
27        node_implementation::{NodeImplementation, NodeType},
28        signature_key::SignatureKey,
29    },
30    utils::is_epoch_transition,
31    vote::HasViewNumber,
32};
33use hotshot_utils::anytrace::*;
34use rand::{seq::SliceRandom, thread_rng};
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::JoinHandle, time::sleep};
37use tracing::instrument;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41/// Amount of time to try for a request before timing out.
42pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
43
44/// Long running task which will request information after a proposal is received.
45/// The task will wait a it's `delay` and then send a request iteratively to peers
46/// for any data they don't have related to the proposal.  For now it's just requesting VID
47/// shares.
48pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49    /// Network to send requests over
50    /// The underlying network
51    pub network: Arc<I::Network>,
52
53    /// Consensus shared state so we can check if we've gotten the information
54    /// before sending a request
55    pub consensus: OuterConsensus<TYPES>,
56
57    /// Last seen view, we won't request for proposals before older than this view
58    pub view: ViewNumber,
59
60    /// Delay before requesting peers
61    pub delay: Duration,
62
63    /// Membership (Used here only for DA)
64    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
65
66    /// This nodes public key
67    pub public_key: TYPES::SignatureKey,
68
69    /// This nodes private/signing key, used to sign requests.
70    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
71
72    /// The node's id
73    pub id: u64,
74
75    /// A flag indicating that `HotShotEvent::Shutdown` has been received
76    pub shutdown_flag: Arc<AtomicBool>,
77
78    /// A flag indicating that `HotShotEvent::Shutdown` has been received
79    pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
80
81    /// Number of blocks in an epoch, zero means there are no epochs
82    pub epoch_height: u64,
83}
84
85impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
86    fn drop(&mut self) {
87        self.cancel_subtasks();
88    }
89}
90
91/// Alias for a signature
92type Signature<TYPES> =
93    <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
94
95#[async_trait]
96impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
97    type Event = HotShotEvent<TYPES>;
98
99    #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
100    async fn handle_event(
101        &mut self,
102        event: Arc<Self::Event>,
103        sender: &Sender<Arc<Self::Event>>,
104        _receiver: &Receiver<Arc<Self::Event>>,
105    ) -> Result<()> {
106        match event.as_ref() {
107            HotShotEvent::QuorumProposalValidated(proposal, _) => {
108                let prop_view = proposal.data.view_number();
109                let prop_epoch = proposal.data.epoch();
110
111                // Request VID share only if:
112                // 1. we are part of the current epoch or
113                // 2. we are part of the next epoch and this is a proposal for in transition.
114                let membership = self
115                    .membership_coordinator
116                    .stake_table_for_epoch(prop_epoch)?;
117                let mut target_epochs = BTreeSet::new();
118                if membership.has_stake(&self.public_key) {
119                    target_epochs.insert(prop_epoch);
120                }
121                if is_epoch_transition(
122                    proposal.data.block_header().block_number(),
123                    self.epoch_height,
124                ) && membership
125                    .next_epoch_stake_table()?
126                    .has_stake(&self.public_key)
127                {
128                    target_epochs.insert(prop_epoch.map(|e| e + 1));
129                }
130
131                ensure!(
132                    !target_epochs.is_empty(),
133                    "We don't belong to the current epoch and we don't belong to the next epoch. \
134                     Do not request VID share."
135                );
136
137                let consensus_reader = self.consensus.read().await;
138                let maybe_vid_share = consensus_reader
139                    .vid_shares()
140                    .get(&prop_view)
141                    .and_then(|shares| shares.get(&self.public_key));
142                // If we already have the VID shares for the next view, do nothing.
143                if prop_view >= self.view
144                    && (maybe_vid_share.is_none()
145                        || !target_epochs
146                            .iter()
147                            .all(|e| maybe_vid_share.unwrap().contains_key(e)))
148                {
149                    drop(consensus_reader);
150                    self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
151                        .await;
152                }
153                Ok(())
154            },
155            HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
156                let view = vid_proposal.data.view_number();
157                let epoch = vid_proposal.data.epoch();
158
159                // Get the committee members for the view and the leader, if applicable
160                let membership_reader = self.membership_coordinator.membership_for_epoch(epoch)?;
161                let mut da_committee_for_view: BTreeSet<_> = membership_reader
162                    .da_committee_members(view)
163                    .cloned()
164                    .collect();
165                if let Ok(leader) = membership_reader.leader(view) {
166                    da_committee_for_view.insert(leader);
167                }
168                drop(membership_reader);
169
170                ensure!(
171                    self.spawned_tasks.contains_key(&view),
172                    info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
173                );
174
175                ensure!(
176                    da_committee_for_view.contains(sender_key),
177                    warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
178                );
179
180                ensure!(
181                    sender_key.validate(
182                        &vid_proposal.signature,
183                        vid_proposal.data.payload_commitment_ref()
184                    ),
185                    warn!("Received VidResponseRecv with invalid signature")
186                );
187
188                tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
189                broadcast_event(
190                    Arc::new(HotShotEvent::VidShareRecv(
191                        sender_key.clone(),
192                        vid_proposal.clone(),
193                    )),
194                    sender,
195                )
196                .await;
197                Ok(())
198            },
199            HotShotEvent::ViewChange(view, _) => {
200                let view = *view;
201                if view > self.view {
202                    self.view = view;
203                }
204                // Clean old tasks' handles
205                self.spawned_tasks
206                    .range_mut(..self.view)
207                    .for_each(|(_, handles)| {
208                        handles.retain(|handle| !handle.is_finished());
209                    });
210                self.spawned_tasks
211                    .retain(|view, handles| view >= &self.view || !handles.is_empty());
212                Ok(())
213            },
214            _ => Ok(()),
215        }
216    }
217
218    fn cancel_subtasks(&mut self) {
219        self.shutdown_flag.store(true, Ordering::Relaxed);
220
221        while !self.spawned_tasks.is_empty() {
222            let Some((_, handles)) = self.spawned_tasks.pop_first() else {
223                break;
224            };
225
226            for handle in handles {
227                handle.abort();
228            }
229        }
230    }
231}
232
233impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
234    /// Creates and signs the payload, then will create a request task
235    async fn spawn_requests(
236        &mut self,
237        view: ViewNumber,
238        prop_epoch: Option<EpochNumber>,
239        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
240        target_epochs: BTreeSet<Option<EpochNumber>>,
241    ) {
242        let request = RequestKind::Vid(view, self.public_key.clone());
243
244        // First sign the request for the VID shares.
245        if let Some(signature) = self.serialize_and_sign(&request) {
246            self.create_vid_request_task(
247                request,
248                signature,
249                sender.clone(),
250                view,
251                prop_epoch,
252                target_epochs,
253            )
254            .await;
255        }
256    }
257
258    /// Creates a task that will request a VID share from a DA member and wait for the `HotShotEvent::VidResponseRecv`event
259    /// If we get the VID disperse share, broadcast `HotShotEvent::VidShareRecv` and terminate task
260    async fn create_vid_request_task(
261        &mut self,
262        request: RequestKind<TYPES>,
263        signature: Signature<TYPES>,
264        sender: Sender<Arc<HotShotEvent<TYPES>>>,
265        view: ViewNumber,
266        prop_epoch: Option<EpochNumber>,
267        mut target_epochs: BTreeSet<Option<EpochNumber>>,
268    ) {
269        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
270        let network = Arc::clone(&self.network);
271        let shutdown_flag = Arc::clone(&self.shutdown_flag);
272        let delay = self.delay;
273        let public_key = self.public_key.clone();
274
275        // Get the committee members for the view and the leader, if applicable
276        let membership_reader = match self.membership_coordinator.membership_for_epoch(prop_epoch) {
277            Ok(m) => m,
278            Err(e) => {
279                tracing::warn!(e.message);
280                return;
281            },
282        };
283        // Get committee members for view
284        let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
285            .da_committee_members(view)
286            .cloned()
287            .collect();
288
289        // Randomize the recipients so all replicas don't overload the same 1 recipients
290        // and so we don't implicitly rely on the same replica all the time.
291        recipients.shuffle(&mut thread_rng());
292
293        // prepare request
294        let data_request = DataRequest::<TYPES> {
295            request,
296            view,
297            signature,
298        };
299        let my_id = self.id;
300        let handle: JoinHandle<()> = spawn(async move {
301            // Do the delay only if primary is up and then start sending
302            if !network.is_primary_down() {
303                sleep(delay).await;
304            }
305
306            let mut recipients_it = recipients.iter();
307            // First check if we got the data before continuing
308            while !Self::cancel_vid_request_task(
309                &consensus,
310                &public_key,
311                &view,
312                &shutdown_flag,
313                my_id,
314                &mut target_epochs,
315            )
316            .await
317            {
318                // Cycle da members we send the request to each time
319                if let Some(recipient) = recipients_it.next() {
320                    if *recipient == public_key {
321                        // no need to send a message to ourselves.
322                        // just check for the data at start of loop in `cancel_vid_request_task`
323                        continue;
324                    }
325                    tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
326                    // First send request to a random DA member for the view
327                    broadcast_event(
328                        HotShotEvent::VidRequestSend(
329                            data_request.clone(),
330                            public_key.clone(),
331                            recipient.clone(),
332                        )
333                        .into(),
334                        &sender,
335                    )
336                    .await;
337                    // Wait before sending the request to the next recipient.
338                    sleep(REQUEST_TIMEOUT).await;
339                } else {
340                    // This shouldn't be possible `recipients_it.next()` should clone original and start over if `None`
341                    tracing::warn!(
342                        "Sent VID request to all available DA members and got no response for \
343                         view: {view:?}, my id: {my_id:?}"
344                    );
345                    return;
346                }
347            }
348        });
349        self.spawned_tasks.entry(view).or_default().push(handle);
350    }
351
352    /// Returns true if we got the data we wanted, a shutdown event was received, or the view has moved on.
353    async fn cancel_vid_request_task(
354        consensus: &OuterConsensus<TYPES>,
355        public_key: &<TYPES as NodeType>::SignatureKey,
356        view: &ViewNumber,
357        shutdown_flag: &Arc<AtomicBool>,
358        id: u64,
359        target_epochs: &mut BTreeSet<Option<EpochNumber>>,
360    ) -> bool {
361        let consensus_reader = consensus.read().await;
362
363        let maybe_vid_shares = consensus_reader
364            .vid_shares()
365            .get(view)
366            .and_then(|key_map| key_map.get(public_key));
367        if let Some(vid_shares) = maybe_vid_shares {
368            tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
369            for vid_share in vid_shares.values() {
370                target_epochs.remove(&vid_share.data.target_epoch());
371            }
372        }
373        let cancel = shutdown_flag.load(Ordering::Relaxed)
374            || consensus_reader.cur_view() > *view
375            || target_epochs.is_empty();
376        if cancel {
377            tracing::debug!(
378                "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
379                view,
380                consensus_reader.cur_view(),
381                id,
382            );
383        }
384        cancel
385    }
386
387    /// Sign the serialized version of the request
388    fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
389        let Ok(data) = bincode::serialize(&request) else {
390            tracing::error!("Failed to serialize request!");
391            return None;
392        };
393        let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
394        else {
395            tracing::error!("Failed to sign Data Request");
396            return None;
397        };
398        Some(signature)
399    }
400}