1use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{FutureExt, future::BoxFuture};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25 AvailabilityProvider, Fetcher, block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26 vid::fetch_vid_common_with_header,
27};
28use crate::{
29 Header, Payload, QueryError, QueryResult,
30 availability::{BlockId, QueryableHeader, QueryablePayload},
31 data_source::{
32 fetching::{
33 Fetchable, HeaderQueryData, LeafQueryData, Notifiers,
34 block::fetch_block_range_with_headers,
35 leaf::{RangeRequest, fetch_leaf_range_with_callbacks},
36 vid::fetch_vid_common_range_with_headers,
37 },
38 storage::{
39 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
40 pruning::PrunedHeightStorage,
41 },
42 update::VersionedDataSource,
43 },
44 fetching::NonEmptyRange,
45};
46
47impl<Types: NodeType> From<LeafQueryData<Types>> for HeaderQueryData<Types> {
48 fn from(leaf: LeafQueryData<Types>) -> Self {
49 let header = leaf.header().clone();
50
51 Self { header }
52 }
53}
54
55fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
56where
57 Types: NodeType,
58 Header<Types>: QueryableHeader<Types>,
59 Payload<Types>: QueryablePayload<Types>,
60{
61 HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
62}
63
64#[async_trait]
65impl<Types> Fetchable<Types> for HeaderQueryData<Types>
66where
67 Types: NodeType,
68 Header<Types>: QueryableHeader<Types>,
69 Payload<Types>: QueryablePayload<Types>,
70{
71 type Request = BlockId<Types>;
72
73 fn satisfies(&self, req: Self::Request) -> bool {
74 let header = self.header();
75 match req {
76 BlockId::Number(n) => header.block_number() as usize == n,
77 BlockId::Hash(h) => header.commit() == h,
78 BlockId::PayloadHash(h) => header.payload_commitment() == h,
79 }
80 }
81
82 async fn passive_fetch(
83 notifiers: &Notifiers<Types>,
84 req: Self::Request,
85 ) -> BoxFuture<'static, Option<Self>> {
86 notifiers
87 .leaf
88 .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
89 .await
90 .into_future()
91 .map(|leaf| leaf.map(HeaderQueryData::from))
92 .boxed()
93 }
94
95 async fn active_fetch<S, P>(
96 tx: &mut impl AvailabilityStorage<Types>,
97 fetcher: Arc<Fetcher<Types, S, P>>,
98 req: Self::Request,
99 ) -> anyhow::Result<()>
100 where
101 S: VersionedDataSource + 'static,
102 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
103 for<'a> S::ReadOnly<'a>:
104 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
105 P: AvailabilityProvider<Types>,
106 {
107 fetch_header_and_then(
111 tx,
112 req,
113 HeaderCallback::Payload {
114 fetcher: fetcher.clone(),
115 },
116 )
117 .await
118 }
119
120 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
121 where
122 S: AvailabilityStorage<Types>,
123 {
124 storage.get_header(req).await.map(|header| Self { header })
125 }
126}
127
128#[derive(Derivative)]
129#[derivative(Debug(bound = ""))]
130pub(super) enum HeaderCallback<Types, S, P>
131where
132 Types: NodeType,
133{
134 Payload {
136 #[derivative(Debug = "ignore")]
137 fetcher: Arc<Fetcher<Types, S, P>>,
138 },
139 VidCommon {
141 #[derivative(Debug = "ignore")]
142 fetcher: Arc<Fetcher<Types, S, P>>,
143 },
144}
145
146impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
147 fn eq(&self, other: &Self) -> bool {
148 self.cmp(other).is_eq()
149 }
150}
151
152impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
153
154impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
155 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
156 Some(self.cmp(other))
157 }
158}
159
160impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
161 fn cmp(&self, other: &Self) -> Ordering {
162 match (self, other) {
164 (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
165 (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
166 _ => Ordering::Equal,
167 }
168 }
169}
170
171impl<Types, S, P> HeaderCallback<Types, S, P>
172where
173 Types: NodeType,
174 Header<Types>: QueryableHeader<Types>,
175 Payload<Types>: QueryablePayload<Types>,
176 S: VersionedDataSource + 'static,
177 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
178 P: AvailabilityProvider<Types>,
179{
180 fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
181 match self {
182 Self::Payload { fetcher } => fetcher.clone(),
183 Self::VidCommon { fetcher } => fetcher.clone(),
184 }
185 }
186
187 pub(super) fn run(self, header: Header<Types>) {
188 match self {
189 Self::Payload { fetcher } => {
190 tracing::info!(
191 "fetched leaf {}, will now fetch payload",
192 header.block_number()
193 );
194 fetch_block_with_header(fetcher, header);
195 },
196 Self::VidCommon { fetcher } => {
197 tracing::info!(
198 "fetched leaf {}, will now fetch VID common",
199 header.block_number()
200 );
201 fetch_vid_common_with_header(fetcher, header);
202 },
203 }
204 }
205
206 pub(super) fn run_range(self, headers: NonEmptyRange<Header<Types>>) {
207 match self {
208 Self::Payload { fetcher } => {
209 tracing::info!(
210 "fetched leaves {}..{}, will now fetch payload",
211 headers.start(),
212 headers.end(),
213 );
214 fetch_block_range_with_headers(fetcher, headers);
215 },
216 Self::VidCommon { fetcher } => {
217 tracing::info!(
218 "fetched leaves {}..{}, will now fetch VID common",
219 headers.start(),
220 headers.end(),
221 );
222 fetch_vid_common_range_with_headers(fetcher, headers);
223 },
224 }
225 }
226}
227
228pub(super) async fn fetch_header_and_then<Types, S, P>(
229 tx: &mut impl AvailabilityStorage<Types>,
230 req: BlockId<Types>,
231 callback: HeaderCallback<Types, S, P>,
232) -> anyhow::Result<()>
233where
234 Types: NodeType,
235 Header<Types>: QueryableHeader<Types>,
236 Payload<Types>: QueryablePayload<Types>,
237 S: VersionedDataSource + 'static,
238 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
239 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
240 P: AvailabilityProvider<Types>,
241{
242 match tx.get_header(req).await {
251 Ok(header) => {
252 callback.run(header);
253 return Ok(());
254 },
255 Err(QueryError::Missing | QueryError::NotFound) => {
256 tracing::debug!(?req, "header not available locally; trying fetch");
259 },
260 Err(QueryError::Error { message }) => {
261 bail!("failed to fetch header for block {req:?}: {message}");
264 },
265 }
266
267 match req {
272 BlockId::Number(n) => {
273 fetch_leaf_with_callbacks(tx, callback.fetcher(), n.into(), [callback.into()]).await?;
274 },
275 BlockId::Hash(h) => {
276 tracing::debug!("not fetching unknown block {h}");
279 },
280 BlockId::PayloadHash(h) => {
281 tracing::debug!("not fetching block with unknown payload {h}");
283 },
284 }
285
286 Ok(())
287}
288
289pub(super) async fn fetch_header_range_and_then<Types, S, P>(
290 tx: &mut impl AvailabilityStorage<Types>,
291 req: RangeRequest,
292 callback: HeaderCallback<Types, S, P>,
293) -> anyhow::Result<()>
294where
295 Types: NodeType,
296 Header<Types>: QueryableHeader<Types>,
297 Payload<Types>: QueryablePayload<Types>,
298 S: VersionedDataSource + 'static,
299 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
300 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
301 P: AvailabilityProvider<Types>,
302{
303 match <NonEmptyRange<LeafQueryData<Types>>>::load(tx, req).await {
305 Ok(leaves) => {
306 callback.run_range(leaves.as_ref_cloned());
307 return Ok(());
308 },
309 Err(QueryError::Missing | QueryError::NotFound) => {
310 tracing::debug!(?req, "headers not available locally; trying fetch");
313 },
314 Err(QueryError::Error { message }) => {
315 bail!("failed to fetch headers for range {req:?}: {message}");
318 },
319 }
320
321 fetch_leaf_range_with_callbacks(tx, callback.fetcher(), req, [callback.into()]).await?;
323
324 Ok(())
325}