Skip to main content

hotshot_example_types/membership/
fetcher.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6use std::{collections::BTreeMap, sync::Arc};
7
8use anyhow::Context;
9use async_broadcast::{Receiver, RecvError};
10use hotshot_types::{
11    data::Leaf2,
12    event::{Event, EventType},
13    message::{Message, MessageKind},
14    traits::{
15        block_contents::BlockHeader, leaf_fetcher_network::LeafFetcherNetwork,
16        node_implementation::NodeType,
17    },
18    vote::HasViewNumber,
19};
20use tokio::task::JoinHandle;
21use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
22
23use crate::storage_types::TestStorage;
24
25pub struct Leaf2Fetcher<TYPES: NodeType> {
26    pub network: Arc<dyn LeafFetcherNetwork<TYPES>>,
27    pub storage: TestStorage<TYPES>,
28    pub listener: Option<JoinHandle<()>>,
29    pub public_key: TYPES::SignatureKey,
30    pub network_receiver: Option<Receiver<Event<TYPES>>>,
31}
32
33impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
34    pub fn new(
35        network: Arc<dyn LeafFetcherNetwork<TYPES>>,
36        storage: TestStorage<TYPES>,
37        public_key: TYPES::SignatureKey,
38    ) -> Self {
39        Self {
40            network,
41            storage,
42            listener: None,
43            public_key,
44            network_receiver: None,
45        }
46    }
47
48    pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
49        let public_key = self.public_key.clone();
50        let storage = self.storage.clone();
51        let network = self.network.clone();
52
53        self.network_receiver = Some(network_receiver.clone());
54
55        let listener = tokio::spawn(async move {
56            loop {
57                match network_receiver.recv_direct().await {
58                    Ok(Event {
59                        view_number: view,
60                        event: EventType::ExternalMessageReceived { sender: _, data },
61                    }) => {
62                        let (requested_height, requester): (u64, TYPES::SignatureKey) =
63                            match bincode::deserialize(&data) {
64                                Ok(message) => message,
65                                Err(e) => {
66                                    tracing::debug!("Failed to deserialize message: {e:?}");
67                                    continue;
68                                },
69                            };
70
71                        let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
72                            .inner
73                            .read()
74                            .await
75                            .proposals_wrapper
76                            .values()
77                            .map(|proposal| {
78                                (
79                                    proposal.data.block_header().block_number(),
80                                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
81                                )
82                            })
83                            .collect();
84
85                        let heights = leaves.keys().collect::<Vec<_>>();
86
87                        let Some(leaf) = leaves.get(&requested_height) else {
88                            tracing::error!(
89                                "Block at height {requested_height} not found in storage.\n\n \
90                                 stored leaf heights: {heights:?}"
91                            );
92                            continue;
93                        };
94
95                        let leaf_response = Message {
96                            sender: public_key.clone(),
97                            kind: MessageKind::<TYPES>::External(
98                                bincode::serialize(&leaf).expect("Failed to serialize leaf"),
99                            ),
100                        };
101
102                        let serialized_leaf_response =
103                            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
104                                .expect("Failed to serialize leaf response");
105
106                        if let Err(e) = network
107                            .send_leaf_response(
108                                view.u64().into(),
109                                serialized_leaf_response,
110                                requester,
111                            )
112                            .await
113                        {
114                            tracing::error!(
115                                "Failed to send leaf response in test membership fetcher: {e}, \
116                                 requested height: {requested_height}"
117                            );
118                        };
119                    },
120                    Err(RecvError::Closed) => {
121                        break;
122                    },
123                    _ => {
124                        continue;
125                    },
126                }
127            }
128        });
129
130        self.listener = Some(listener);
131    }
132
133    pub async fn fetch_leaf(
134        &self,
135        height: u64,
136        source: TYPES::SignatureKey,
137    ) -> anyhow::Result<Leaf2<TYPES>> {
138        let leaf_request = Message {
139            sender: self.public_key.clone(),
140            kind: MessageKind::<TYPES>::External(
141                bincode::serialize(&(height, self.public_key.clone()))
142                    .expect("Failed to serialize leaf request"),
143            ),
144        };
145        let view = leaf_request.view_number();
146
147        let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
148            .storage
149            .inner
150            .read()
151            .await
152            .proposals_wrapper
153            .values()
154            .map(|proposal| {
155                (
156                    proposal.data.block_header().block_number(),
157                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
158                )
159            })
160            .collect();
161
162        let heights = leaves.keys().collect::<Vec<_>>();
163
164        if let Some(leaf) = leaves.get(&height) {
165            return Ok(leaf.clone());
166        };
167        tracing::debug!(
168            "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
169        );
170
171        let mut network_receiver = self
172            .network_receiver
173            .clone()
174            .expect("Tried to fetch leaf before calling `set_external_channel`");
175
176        let serialized_leaf_request =
177            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
178                .expect("Failed to serialize leaf request");
179
180        if let Err(e) = self
181            .network
182            .send_leaf_request(view.u64().into(), serialized_leaf_request, source)
183            .await
184        {
185            tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
186        };
187
188        tokio::time::timeout(std::time::Duration::from_millis(200), async {
189            loop {
190                match network_receiver.recv_direct().await {
191                    Ok(Event {
192                        view_number: _,
193                        event: EventType::ExternalMessageReceived { sender: _, data },
194                    }) => {
195                        let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
196                            Ok(message) => message,
197                            Err(e) => {
198                                tracing::debug!("Failed to deserialize message: {e:?}");
199                                continue;
200                            },
201                        };
202
203                        if leaf.height() == height {
204                            return Ok(leaf);
205                        }
206                    },
207                    Err(RecvError::Closed) => {
208                        break Err(anyhow::anyhow!(
209                            "Failed to fetch leaf: network task receiver closed"
210                        ));
211                    },
212                    _ => {
213                        continue;
214                    },
215                }
216            }
217        })
218        .await
219        .context("Leaf fetch timed out")?
220    }
221}