1use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{FutureExt, future::BoxFuture};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25 AvailabilityProvider, Fetcher, block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26 vid::fetch_vid_common_with_header,
27};
28use crate::{
29 Header, Payload, QueryError, QueryResult,
30 availability::{BlockId, QueryableHeader, QueryablePayload},
31 data_source::{
32 fetching::{
33 Fetchable, HeaderQueryData, LeafQueryData, Notifiers,
34 block::fetch_block_range,
35 leaf::{RangeRequest, fetch_leaf_range_with_callbacks},
36 vid::fetch_vid_common_range,
37 },
38 storage::{
39 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
40 pruning::PrunedHeightStorage,
41 },
42 update::VersionedDataSource,
43 },
44 fetching::NonEmptyRange,
45};
46
47fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
48where
49 Types: NodeType,
50 Header<Types>: QueryableHeader<Types>,
51 Payload<Types>: QueryablePayload<Types>,
52{
53 HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
54}
55
56#[async_trait]
57impl<Types> Fetchable<Types> for HeaderQueryData<Types>
58where
59 Types: NodeType,
60 Header<Types>: QueryableHeader<Types>,
61 Payload<Types>: QueryablePayload<Types>,
62{
63 type Request = BlockId<Types>;
64
65 fn satisfies(&self, req: Self::Request) -> bool {
66 let header = self.header();
67 match req {
68 BlockId::Number(n) => header.block_number() as usize == n,
69 BlockId::Hash(h) => header.commit() == h,
70 BlockId::PayloadHash(h) => header.payload_commitment() == h,
71 }
72 }
73
74 async fn passive_fetch(
75 notifiers: &Notifiers<Types>,
76 req: Self::Request,
77 ) -> BoxFuture<'static, Option<Self>> {
78 notifiers
79 .leaf
80 .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
81 .await
82 .into_future()
83 .map(|leaf| leaf.map(HeaderQueryData::from))
84 .boxed()
85 }
86
87 async fn active_fetch<S, P>(
88 tx: &mut impl AvailabilityStorage<Types>,
89 fetcher: Arc<Fetcher<Types, S, P>>,
90 req: Self::Request,
91 ) -> anyhow::Result<()>
92 where
93 S: VersionedDataSource + 'static,
94 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
95 for<'a> S::ReadOnly<'a>:
96 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
97 P: AvailabilityProvider<Types>,
98 {
99 fetch_header_and_then(
103 tx,
104 req,
105 HeaderCallback::Payload {
106 fetcher: fetcher.clone(),
107 },
108 )
109 .await
110 }
111
112 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
113 where
114 S: AvailabilityStorage<Types>,
115 {
116 storage.get_header(req).await.map(|header| Self { header })
117 }
118}
119
120#[derive(Derivative)]
121#[derivative(Debug(bound = ""))]
122pub(super) enum HeaderCallback<Types, S, P>
123where
124 Types: NodeType,
125{
126 Payload {
128 #[derivative(Debug = "ignore")]
129 fetcher: Arc<Fetcher<Types, S, P>>,
130 },
131 VidCommon {
133 #[derivative(Debug = "ignore")]
134 fetcher: Arc<Fetcher<Types, S, P>>,
135 },
136}
137
138impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
139 fn eq(&self, other: &Self) -> bool {
140 self.cmp(other).is_eq()
141 }
142}
143
144impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
145
146impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
147 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
148 Some(self.cmp(other))
149 }
150}
151
152impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
153 fn cmp(&self, other: &Self) -> Ordering {
154 match (self, other) {
156 (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
157 (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
158 _ => Ordering::Equal,
159 }
160 }
161}
162
163impl<Types, S, P> HeaderCallback<Types, S, P>
164where
165 Types: NodeType,
166 Header<Types>: QueryableHeader<Types>,
167 Payload<Types>: QueryablePayload<Types>,
168 S: VersionedDataSource + 'static,
169 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
170 P: AvailabilityProvider<Types>,
171{
172 fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
173 match self {
174 Self::Payload { fetcher } => fetcher.clone(),
175 Self::VidCommon { fetcher } => fetcher.clone(),
176 }
177 }
178
179 pub(super) fn run(self, header: Header<Types>) {
180 match self {
181 Self::Payload { fetcher } => {
182 tracing::info!(
183 "fetched leaf {}, will now fetch payload",
184 header.block_number()
185 );
186 fetch_block_with_header(fetcher, header);
187 },
188 Self::VidCommon { fetcher } => {
189 tracing::info!(
190 "fetched leaf {}, will now fetch VID common",
191 header.block_number()
192 );
193 fetch_vid_common_with_header(fetcher, header);
194 },
195 }
196 }
197
198 pub(super) fn run_range(self, start: u64, end: u64) {
199 match self {
200 Self::Payload { fetcher } => {
201 tracing::info!("fetched leaves {start}..{end}, will now fetch payload",);
202 fetch_block_range(fetcher, start, end);
203 },
204 Self::VidCommon { fetcher } => {
205 tracing::info!("fetched leaves {start}..{end}, will now fetch VID common",);
206 fetch_vid_common_range(fetcher, start, end);
207 },
208 }
209 }
210}
211
212pub(super) async fn fetch_header_and_then<Types, S, P>(
213 tx: &mut impl AvailabilityStorage<Types>,
214 req: BlockId<Types>,
215 callback: HeaderCallback<Types, S, P>,
216) -> anyhow::Result<()>
217where
218 Types: NodeType,
219 Header<Types>: QueryableHeader<Types>,
220 Payload<Types>: QueryablePayload<Types>,
221 S: VersionedDataSource + 'static,
222 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
223 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
224 P: AvailabilityProvider<Types>,
225{
226 match tx.get_header(req).await {
235 Ok(header) => {
236 callback.run(header);
237 return Ok(());
238 },
239 Err(QueryError::Missing | QueryError::NotFound) => {
240 tracing::debug!(?req, "header not available locally; trying fetch");
243 },
244 Err(QueryError::Error { message }) => {
245 bail!("failed to fetch header for block {req:?}: {message}");
248 },
249 }
250
251 match req {
256 BlockId::Number(n) => {
257 fetch_leaf_with_callbacks(callback.fetcher(), n.into(), [callback.into()]).await?;
258 },
259 BlockId::Hash(h) => {
260 tracing::debug!("not fetching unknown block {h}");
263 },
264 BlockId::PayloadHash(h) => {
265 tracing::debug!("not fetching block with unknown payload {h}");
267 },
268 }
269
270 Ok(())
271}
272
273pub(super) async fn fetch_header_range_and_then<Types, S, P>(
274 tx: &mut impl AvailabilityStorage<Types>,
275 req: RangeRequest,
276 callback: HeaderCallback<Types, S, P>,
277) -> anyhow::Result<()>
278where
279 Types: NodeType,
280 Header<Types>: QueryableHeader<Types>,
281 Payload<Types>: QueryablePayload<Types>,
282 S: VersionedDataSource + 'static,
283 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
284 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
285 P: AvailabilityProvider<Types>,
286{
287 match <NonEmptyRange<LeafQueryData<Types>>>::load(tx, req).await {
290 Ok(_) => {
291 callback.run_range(req.start, req.end);
292 return Ok(());
293 },
294 Err(QueryError::Missing | QueryError::NotFound) => {
295 tracing::debug!(?req, "headers not available locally; trying fetch");
298 },
299 Err(QueryError::Error { message }) => {
300 bail!("failed to fetch headers for range {req:?}: {message}");
303 },
304 }
305
306 fetch_leaf_range_with_callbacks(callback.fetcher(), req, [callback.into()]).await?;
308
309 Ok(())
310}