hotshot_example_types/membership/
fetcher.rs1use std::{collections::BTreeMap, sync::Arc};
7
8use anyhow::Context;
9use async_broadcast::{Receiver, RecvError};
10use hotshot_types::{
11 data::Leaf2,
12 event::{Event, EventType},
13 message::{Message, MessageKind},
14 traits::{
15 block_contents::BlockHeader, leaf_fetcher_network::LeafFetcherNetwork,
16 node_implementation::NodeType,
17 },
18 vote::HasViewNumber,
19};
20use tokio::task::JoinHandle;
21use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
22
23use crate::storage_types::TestStorage;
24
25pub struct Leaf2Fetcher<TYPES: NodeType> {
26 pub network: Arc<dyn LeafFetcherNetwork<TYPES>>,
27 pub storage: TestStorage<TYPES>,
28 pub listener: Option<JoinHandle<()>>,
29 pub public_key: TYPES::SignatureKey,
30 pub network_receiver: Option<Receiver<Event<TYPES>>>,
31}
32
33impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
34 pub fn new(
35 network: Arc<dyn LeafFetcherNetwork<TYPES>>,
36 storage: TestStorage<TYPES>,
37 public_key: TYPES::SignatureKey,
38 ) -> Self {
39 Self {
40 network,
41 storage,
42 listener: None,
43 public_key,
44 network_receiver: None,
45 }
46 }
47
48 pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
49 let public_key = self.public_key.clone();
50 let storage = self.storage.clone();
51 let network = self.network.clone();
52
53 self.network_receiver = Some(network_receiver.clone());
54
55 let listener = tokio::spawn(async move {
56 loop {
57 match network_receiver.recv_direct().await {
58 Ok(Event {
59 view_number: view,
60 event: EventType::ExternalMessageReceived { sender: _, data },
61 }) => {
62 let (requested_height, requester): (u64, TYPES::SignatureKey) =
63 match bincode::deserialize(&data) {
64 Ok(message) => message,
65 Err(e) => {
66 tracing::debug!("Failed to deserialize message: {e:?}");
67 continue;
68 },
69 };
70
71 let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
72 .inner
73 .read()
74 .await
75 .proposals_wrapper
76 .values()
77 .map(|proposal| {
78 (
79 proposal.data.block_header().block_number(),
80 Leaf2::from_quorum_proposal(&proposal.data.clone()),
81 )
82 })
83 .collect();
84
85 let heights = leaves.keys().collect::<Vec<_>>();
86
87 let Some(leaf) = leaves.get(&requested_height) else {
88 tracing::error!(
89 "Block at height {requested_height} not found in storage.\n\n \
90 stored leaf heights: {heights:?}"
91 );
92 continue;
93 };
94
95 let leaf_response = Message {
96 sender: public_key.clone(),
97 kind: MessageKind::<TYPES>::External(
98 bincode::serialize(&leaf).expect("Failed to serialize leaf"),
99 ),
100 };
101
102 let serialized_leaf_response =
103 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
104 .expect("Failed to serialize leaf response");
105
106 if let Err(e) = network
107 .send_leaf_response(
108 view.u64().into(),
109 serialized_leaf_response,
110 requester,
111 )
112 .await
113 {
114 tracing::error!(
115 "Failed to send leaf response in test membership fetcher: {e}, \
116 requested height: {requested_height}"
117 );
118 };
119 },
120 Err(RecvError::Closed) => {
121 break;
122 },
123 _ => {
124 continue;
125 },
126 }
127 }
128 });
129
130 self.listener = Some(listener);
131 }
132
133 pub async fn fetch_leaf(
134 &self,
135 height: u64,
136 source: TYPES::SignatureKey,
137 ) -> anyhow::Result<Leaf2<TYPES>> {
138 let leaf_request = Message {
139 sender: self.public_key.clone(),
140 kind: MessageKind::<TYPES>::External(
141 bincode::serialize(&(height, self.public_key.clone()))
142 .expect("Failed to serialize leaf request"),
143 ),
144 };
145 let view = leaf_request.view_number();
146
147 let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
148 .storage
149 .inner
150 .read()
151 .await
152 .proposals_wrapper
153 .values()
154 .map(|proposal| {
155 (
156 proposal.data.block_header().block_number(),
157 Leaf2::from_quorum_proposal(&proposal.data.clone()),
158 )
159 })
160 .collect();
161
162 let heights = leaves.keys().collect::<Vec<_>>();
163
164 if let Some(leaf) = leaves.get(&height) {
165 return Ok(leaf.clone());
166 };
167 tracing::debug!(
168 "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
169 );
170
171 let mut network_receiver = self
172 .network_receiver
173 .clone()
174 .expect("Tried to fetch leaf before calling `set_external_channel`");
175
176 let serialized_leaf_request =
177 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
178 .expect("Failed to serialize leaf request");
179
180 if let Err(e) = self
181 .network
182 .send_leaf_request(view.u64().into(), serialized_leaf_request, source)
183 .await
184 {
185 tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
186 };
187
188 tokio::time::timeout(std::time::Duration::from_millis(200), async {
189 loop {
190 match network_receiver.recv_direct().await {
191 Ok(Event {
192 view_number: _,
193 event: EventType::ExternalMessageReceived { sender: _, data },
194 }) => {
195 let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
196 Ok(message) => message,
197 Err(e) => {
198 tracing::debug!("Failed to deserialize message: {e:?}");
199 continue;
200 },
201 };
202
203 if leaf.height() == height {
204 return Ok(leaf);
205 }
206 },
207 Err(RecvError::Closed) => {
208 break Err(anyhow::anyhow!(
209 "Failed to fetch leaf: network task receiver closed"
210 ));
211 },
212 _ => {
213 continue;
214 },
215 }
216 }
217 })
218 .await
219 .context("Leaf fetch timed out")?
220 }
221}