espresso_node/api/endpoints/
availability.rs

1use std::time::Duration;
2
3use espresso_types::{NamespaceId, NsProof, PubKey, StateCertQueryDataV1, StateCertQueryDataV2};
4use futures::{
5    future::{FutureExt, TryFutureExt, try_join_all},
6    join,
7    stream::{Stream, StreamExt, TryStreamExt},
8    try_join,
9};
10use hotshot_query_service::{
11    ApiState,
12    availability::{
13        self, AvailabilityDataSource, BlockQueryData, Error, FetchBlockSnafu, VidCommonQueryData,
14    },
15    node::{BlockId, NodeDataSource},
16    types::HeightIndexed,
17};
18use hotshot_types::{
19    data::VidShare, simple_certificate::LightClientStateUpdateCertificateV2,
20    traits::network::ConnectedNetwork, vid::avidm::AvidMShare,
21};
22use snafu::OptionExt;
23use tide_disco::{Api, RequestParams, StatusCode, method::ReadState};
24use tracing::warn;
25use vbs::version::StaticVersionType;
26
27use crate::{
28    SeqTypes, SequencerApiVersion, SequencerPersistence,
29    api::{
30        StorageState,
31        data_source::{
32            RequestResponseDataSource, SequencerDataSource, StateCertDataSource,
33            StateCertFetchingDataSource,
34        },
35    },
36};
37
38pub(in crate::api) type AvailState<N, P, D> = ApiState<StorageState<N, P, D>>;
39
40type AvailabilityApi<N, P, D, ApiVer> = Api<AvailState<N, P, D>, Error, ApiVer>;
41
42/// Get a namespace proof for the given block, if possible.
43///
44/// Always returns the newest supported proof type, which supports the greatest number of possible
45/// cases (e.g. proofs can still be generated even if the block was maliciously encoded). For
46/// backwards compatibility, the resulting proof can be downgraded. However, this may fail in case a
47/// proof was only able to be generated by making use of one of the newer proof types.
48///
49/// Returns no proof (`Ok(None)`) if the requested namespace is not present at all in the given
50/// block.
51async fn get_namespace_proof<S>(
52    block: &BlockQueryData<SeqTypes>,
53    common: &VidCommonQueryData<SeqTypes>,
54    ns_id: NamespaceId,
55    state: &S,
56) -> Result<Option<NsProof>, Error>
57where
58    S: ReadState,
59    S::State: NodeDataSource<SeqTypes> + RequestResponseDataSource<SeqTypes> + Sync,
60{
61    let ns_table = block.payload().ns_table();
62    let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
63        return Ok(None);
64    };
65
66    // Optimistically, try to generate a
67    // proof for a correctly encoded block.
68    if let Some(proof) = NsProof::new(block.payload(), &ns_index, common.common()) {
69        return Ok(Some(proof));
70    }
71
72    // If we fail to generate the correct encoding proof, try to generate a v1.1 proof, which
73    // supports proof of incorrect encoding.
74    tracing::warn!(
75        height = block.height(),
76        ?ns_id,
77        "Failed to generate namespace proof, trying to generate incorrect encoding proof"
78    );
79    let vid_shares_req = state
80        .read(move |state| {
81            state
82                .request_vid_shares(block.height(), common.clone(), Duration::from_secs(40))
83                .boxed()
84        })
85        .await;
86    let mut vid_shares = vid_shares_req.await.map_err(|err| {
87        warn!("Failed to request VID shares from network: {err:#}");
88        hotshot_query_service::availability::Error::Custom {
89            message: "Failed to request VID shares from network".to_string(),
90            status: StatusCode::NOT_FOUND,
91        }
92    })?;
93    let vid_share = state
94        .read(|state| state.vid_share(block.height() as usize).boxed())
95        .await;
96    if let Ok(vid_share) = vid_share {
97        vid_shares.push(vid_share);
98    };
99
100    // Collect the shares as V1 shares
101    let vid_shares: Vec<AvidMShare> = vid_shares
102        .into_iter()
103        .filter_map(|share| {
104            if let VidShare::V1(share) = share {
105                Some(share)
106            } else {
107                None
108            }
109        })
110        .collect();
111
112    if let Some(proof) = NsProof::v1_1_new_with_incorrect_encoding(
113        &vid_shares,
114        ns_table,
115        &ns_index,
116        &common.payload_hash(),
117        common.common(),
118    ) {
119        return Ok(Some(proof));
120    }
121
122    Err(Error::Custom {
123        message: "Failed to generate proof of incorrect encoding".to_string(),
124        status: StatusCode::INTERNAL_SERVER_ERROR,
125    })
126}
127
128fn extract_ns_proof_v1(
129    proof: Option<NsProof>,
130    ns_id: NamespaceId,
131) -> Result<espresso_types::NamespaceProofQueryData, Error> {
132    let transactions = proof
133        .as_ref()
134        .map(|proof| proof.export_all_txs(&ns_id))
135        .unwrap_or_default();
136    Ok(espresso_types::NamespaceProofQueryData {
137        transactions,
138        proof,
139    })
140}
141
142fn extract_ns_proof_v0(
143    proof: Option<NsProof>,
144    ns_id: NamespaceId,
145) -> Result<espresso_types::ADVZNamespaceProofQueryData, Error> {
146    let proof = match proof {
147        Some(NsProof::V0(proof)) => Some(proof),
148        Some(_) => {
149            return Err(Error::Custom {
150                message: "Unsupported VID version, use new API version instead.".to_string(),
151                status: StatusCode::NOT_FOUND,
152            });
153        },
154        None => None,
155    };
156    let transactions = proof
157        .as_ref()
158        .map(|proof| proof.export_all_txs(&ns_id))
159        .unwrap_or_default();
160    Ok(espresso_types::ADVZNamespaceProofQueryData {
161        transactions,
162        proof,
163    })
164}
165
166async fn get_block_for_ns_proof<S>(
167    req: &RequestParams,
168    state: &S,
169    timeout: Duration,
170) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>), Error>
171where
172    S: ReadState,
173    S::State: AvailabilityDataSource<SeqTypes> + Sync,
174{
175    let id = if let Some(height) = req.opt_integer_param("height")? {
176        BlockId::Number(height)
177    } else if let Some(hash) = req.opt_blob_param("hash")? {
178        BlockId::Hash(hash)
179    } else {
180        BlockId::PayloadHash(req.blob_param("payload-hash")?)
181    };
182    let (fetch_block, fetch_vid) = state
183        .read(|state| async move { join!(state.get_block(id), state.get_vid_common(id)) }.boxed())
184        .await;
185    try_join!(
186        async move {
187            fetch_block
188                .with_timeout(timeout)
189                .await
190                .context(FetchBlockSnafu {
191                    resource: id.to_string(),
192                })
193        },
194        async move {
195            fetch_vid
196                .with_timeout(timeout)
197                .await
198                .context(FetchBlockSnafu {
199                    resource: id.to_string(),
200                })
201        }
202    )
203}
204
205async fn get_block_range_for_ns_proof<S>(
206    req: &RequestParams,
207    state: &S,
208    limit: usize,
209    timeout: Duration,
210) -> Result<Vec<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)>, Error>
211where
212    S: ReadState,
213    S::State: AvailabilityDataSource<SeqTypes> + Sync,
214{
215    let from: usize = req.integer_param("from")?;
216    let until: usize = req.integer_param("until")?;
217    if until.saturating_sub(from) > limit {
218        return Err(Error::RangeLimit { from, until, limit });
219    }
220
221    let (blocks, vids) = state
222        .read(|state| {
223            async move {
224                join!(
225                    state.get_block_range(from..until),
226                    state.get_vid_common_range(from..until)
227                )
228            }
229            .boxed()
230        })
231        .await;
232    blocks
233        .zip(vids)
234        .enumerate()
235        .then(|(i, (block, vid))| async move {
236            let (Some(block), Some(vid)) =
237                join!(block.with_timeout(timeout), vid.with_timeout(timeout),)
238            else {
239                return Err(Error::FetchBlock {
240                    resource: (from + i).to_string(),
241                });
242            };
243            Ok((block, vid))
244        })
245        .try_collect()
246        .await
247}
248
249fn get_block_stream_for_ns_proof<'a, S>(
250    req: RequestParams,
251    state: &'a S,
252) -> impl 'a
253+ Stream<
254    Item = Result<
255        (
256            NamespaceId,
257            BlockQueryData<SeqTypes>,
258            VidCommonQueryData<SeqTypes>,
259        ),
260        Error,
261    >,
262>
263where
264    S: ReadState,
265    S::State: AvailabilityDataSource<SeqTypes> + Sync,
266{
267    async move {
268        let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
269        let height = req.integer_param("height")?;
270        Ok(state
271            .read(|state| {
272                async move {
273                    state
274                        .subscribe_blocks(height)
275                        .await
276                        .zip(state.subscribe_vid_common(height).await)
277                        .map(move |(block, vid)| (ns_id, block, vid))
278                        .map(Ok)
279                }
280                .boxed()
281            })
282            .await)
283    }
284    .try_flatten_stream()
285}
286
287async fn get_state_cert<S>(
288    state: &S,
289    epoch: u64,
290    timeout: Duration,
291) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, availability::Error>
292where
293    S: ReadState,
294    S::State: StateCertDataSource + StateCertFetchingDataSource<SeqTypes> + Sync,
295{
296    // Try to get from local storage first
297    let state_cert = state
298        .read(|state| state.get_state_cert_by_epoch(epoch).boxed())
299        .await
300        .map_err(|e| availability::Error::Custom {
301            message: format!("Failed to get state cert: {e}"),
302            status: StatusCode::INTERNAL_SERVER_ERROR,
303        })?;
304
305    match state_cert {
306        Some(cert) => Ok(cert),
307        None => {
308            // Not found locally, try to fetch from peers
309            let cert = state
310                .read(|state| state.request_state_cert(epoch, timeout).boxed())
311                .await?;
312
313            // Store the fetched certificate
314            state
315                .read(|state| state.insert_state_cert(epoch, cert.clone()).boxed())
316                .await
317                .map_err(|e| availability::Error::Custom {
318                    message: format!("Failed to store state cert: {e}"),
319                    status: StatusCode::INTERNAL_SERVER_ERROR,
320                })?;
321
322            Ok(cert)
323        },
324    }
325}
326
327// TODO (abdul): replace snafu with `this_error` in  hotshot query service
328// Snafu has been replaced by `this_error` everywhere.
329// However, the query service still uses snafu
330pub(in crate::api) fn availability<N, P, D>(
331    api_ver: semver::Version,
332) -> anyhow::Result<AvailabilityApi<N, P, D, SequencerApiVersion>>
333where
334    N: ConnectedNetwork<PubKey>,
335    D: SequencerDataSource + Send + Sync + 'static,
336    P: SequencerPersistence,
337{
338    let mut options = availability::Options::default();
339    let extension = toml::from_str(include_str!("../../../api/availability.toml"))?;
340    options.extensions.push(extension);
341    let timeout = options.fetch_timeout;
342    let limit = options.large_object_range_limit;
343
344    let mut api = availability::define_api::<AvailState<N, P, D>, SeqTypes, _>(
345        &options,
346        SequencerApiVersion::instance(),
347        api_ver.clone(),
348    )?;
349
350    if api_ver.major == 1 {
351        api.at("getnamespaceproof", move |req, state| {
352            async move {
353                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
354                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
355                let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
356                extract_ns_proof_v1(proof, ns_id)
357            }
358            .boxed()
359        })?
360        .at("getnamespaceproof_range", move |req, state| {
361            async move {
362                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
363                let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
364                try_join_all(blocks.iter().map(|(block, vid)| async move {
365                    let proof = get_namespace_proof(block, vid, ns_id, state).await?;
366                    extract_ns_proof_v1(proof, ns_id)
367                }))
368                .await
369            }
370            .boxed()
371        })?
372        .stream("stream_namespace_proofs", move |req, state| {
373            get_block_stream_for_ns_proof(req, state)
374                .and_then(move |(ns_id, block, vid)| async move {
375                    let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
376                    extract_ns_proof_v1(proof, ns_id)
377                })
378                .boxed()
379        })?;
380    } else if api_ver.major == 0 {
381        api.at("getnamespaceproof", move |req, state| {
382            async move {
383                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
384                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
385                let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
386                extract_ns_proof_v0(proof, ns_id)
387            }
388            .boxed()
389        })?
390        .at("getnamespaceproof_range", move |req, state| {
391            async move {
392                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
393                let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
394                try_join_all(blocks.iter().map(|(block, vid)| async move {
395                    let proof = get_namespace_proof(block, vid, ns_id, state).await?;
396                    extract_ns_proof_v0(proof, ns_id)
397                }))
398                .await
399            }
400            .boxed()
401        })?
402        .stream("stream_namespace_proofs", move |req, state| {
403            get_block_stream_for_ns_proof(req, state)
404                .and_then(move |(ns_id, block, vid)| async move {
405                    let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
406                    extract_ns_proof_v0(proof, ns_id)
407                })
408                .boxed()
409        })?;
410    }
411
412    if api_ver.major >= 1 {
413        api.at("incorrect_encoding_proof", move |req, state| {
414            async move {
415                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
416                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
417                match get_namespace_proof(&block, &common, ns_id, state).await? {
418                    Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
419                    Some(_) => Err(Error::Custom {
420                        message: "block was correctly encoded".into(),
421                        status: StatusCode::NOT_FOUND,
422                    }),
423                    None => Err(Error::Custom {
424                        message: "namespace not present in block".into(),
425                        status: StatusCode::NOT_FOUND,
426                    }),
427                }
428            }
429            .boxed()
430        })?;
431    }
432
433    api.at("get_state_cert", move |req, state| {
434        async move {
435            let epoch: u64 = req.integer_param("epoch")?;
436            let cert = get_state_cert(state, epoch, timeout).await?;
437            Ok(StateCertQueryDataV1::from(StateCertQueryDataV2(cert)))
438        }
439        .boxed()
440    })?;
441
442    api.at("get_state_cert_v2", move |req, state| {
443        async move {
444            let epoch: u64 = req.integer_param("epoch")?;
445            let cert = get_state_cert(state, epoch, timeout).await?;
446            Ok(StateCertQueryDataV2(cert))
447        }
448        .boxed()
449    })?;
450
451    Ok(api)
452}