1use std::time::Duration;
2
3use espresso_types::{NamespaceId, NsProof, PubKey, StateCertQueryDataV1, StateCertQueryDataV2};
4use futures::{
5 future::{FutureExt, TryFutureExt, try_join_all},
6 join,
7 stream::{Stream, StreamExt, TryStreamExt},
8 try_join,
9};
10use hotshot_query_service::{
11 ApiState,
12 availability::{
13 self, AvailabilityDataSource, BlockQueryData, Error, FetchBlockSnafu, VidCommonQueryData,
14 },
15 node::{BlockId, NodeDataSource},
16 types::HeightIndexed,
17};
18use hotshot_types::{
19 data::VidShare, simple_certificate::LightClientStateUpdateCertificateV2,
20 traits::network::ConnectedNetwork, vid::avidm::AvidMShare,
21};
22use snafu::OptionExt;
23use tide_disco::{Api, RequestParams, StatusCode, method::ReadState};
24use tracing::warn;
25use vbs::version::StaticVersionType;
26
27use crate::{
28 SeqTypes, SequencerApiVersion, SequencerPersistence,
29 api::{
30 StorageState,
31 data_source::{
32 RequestResponseDataSource, SequencerDataSource, StateCertDataSource,
33 StateCertFetchingDataSource,
34 },
35 },
36};
37
38pub(in crate::api) type AvailState<N, P, D> = ApiState<StorageState<N, P, D>>;
39
40type AvailabilityApi<N, P, D, ApiVer> = Api<AvailState<N, P, D>, Error, ApiVer>;
41
42async fn get_namespace_proof<S>(
52 block: &BlockQueryData<SeqTypes>,
53 common: &VidCommonQueryData<SeqTypes>,
54 ns_id: NamespaceId,
55 state: &S,
56) -> Result<Option<NsProof>, Error>
57where
58 S: ReadState,
59 S::State: NodeDataSource<SeqTypes> + RequestResponseDataSource<SeqTypes> + Sync,
60{
61 let ns_table = block.payload().ns_table();
62 let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
63 return Ok(None);
64 };
65
66 if let Some(proof) = NsProof::new(block.payload(), &ns_index, common.common()) {
69 return Ok(Some(proof));
70 }
71
72 tracing::warn!(
75 height = block.height(),
76 ?ns_id,
77 "Failed to generate namespace proof, trying to generate incorrect encoding proof"
78 );
79 let vid_shares_req = state
80 .read(move |state| {
81 state
82 .request_vid_shares(block.height(), common.clone(), Duration::from_secs(40))
83 .boxed()
84 })
85 .await;
86 let mut vid_shares = vid_shares_req.await.map_err(|err| {
87 warn!("Failed to request VID shares from network: {err:#}");
88 hotshot_query_service::availability::Error::Custom {
89 message: "Failed to request VID shares from network".to_string(),
90 status: StatusCode::NOT_FOUND,
91 }
92 })?;
93 let vid_share = state
94 .read(|state| state.vid_share(block.height() as usize).boxed())
95 .await;
96 if let Ok(vid_share) = vid_share {
97 vid_shares.push(vid_share);
98 };
99
100 let vid_shares: Vec<AvidMShare> = vid_shares
102 .into_iter()
103 .filter_map(|share| {
104 if let VidShare::V1(share) = share {
105 Some(share)
106 } else {
107 None
108 }
109 })
110 .collect();
111
112 if let Some(proof) = NsProof::v1_1_new_with_incorrect_encoding(
113 &vid_shares,
114 ns_table,
115 &ns_index,
116 &common.payload_hash(),
117 common.common(),
118 ) {
119 return Ok(Some(proof));
120 }
121
122 Err(Error::Custom {
123 message: "Failed to generate proof of incorrect encoding".to_string(),
124 status: StatusCode::INTERNAL_SERVER_ERROR,
125 })
126}
127
128fn extract_ns_proof_v1(
129 proof: Option<NsProof>,
130 ns_id: NamespaceId,
131) -> Result<espresso_types::NamespaceProofQueryData, Error> {
132 let transactions = proof
133 .as_ref()
134 .map(|proof| proof.export_all_txs(&ns_id))
135 .unwrap_or_default();
136 Ok(espresso_types::NamespaceProofQueryData {
137 transactions,
138 proof,
139 })
140}
141
142fn extract_ns_proof_v0(
143 proof: Option<NsProof>,
144 ns_id: NamespaceId,
145) -> Result<espresso_types::ADVZNamespaceProofQueryData, Error> {
146 let proof = match proof {
147 Some(NsProof::V0(proof)) => Some(proof),
148 Some(_) => {
149 return Err(Error::Custom {
150 message: "Unsupported VID version, use new API version instead.".to_string(),
151 status: StatusCode::NOT_FOUND,
152 });
153 },
154 None => None,
155 };
156 let transactions = proof
157 .as_ref()
158 .map(|proof| proof.export_all_txs(&ns_id))
159 .unwrap_or_default();
160 Ok(espresso_types::ADVZNamespaceProofQueryData {
161 transactions,
162 proof,
163 })
164}
165
166async fn get_block_for_ns_proof<S>(
167 req: &RequestParams,
168 state: &S,
169 timeout: Duration,
170) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>), Error>
171where
172 S: ReadState,
173 S::State: AvailabilityDataSource<SeqTypes> + Sync,
174{
175 let id = if let Some(height) = req.opt_integer_param("height")? {
176 BlockId::Number(height)
177 } else if let Some(hash) = req.opt_blob_param("hash")? {
178 BlockId::Hash(hash)
179 } else {
180 BlockId::PayloadHash(req.blob_param("payload-hash")?)
181 };
182 let (fetch_block, fetch_vid) = state
183 .read(|state| async move { join!(state.get_block(id), state.get_vid_common(id)) }.boxed())
184 .await;
185 try_join!(
186 async move {
187 fetch_block
188 .with_timeout(timeout)
189 .await
190 .context(FetchBlockSnafu {
191 resource: id.to_string(),
192 })
193 },
194 async move {
195 fetch_vid
196 .with_timeout(timeout)
197 .await
198 .context(FetchBlockSnafu {
199 resource: id.to_string(),
200 })
201 }
202 )
203}
204
205async fn get_block_range_for_ns_proof<S>(
206 req: &RequestParams,
207 state: &S,
208 limit: usize,
209 timeout: Duration,
210) -> Result<Vec<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)>, Error>
211where
212 S: ReadState,
213 S::State: AvailabilityDataSource<SeqTypes> + Sync,
214{
215 let from: usize = req.integer_param("from")?;
216 let until: usize = req.integer_param("until")?;
217 if until.saturating_sub(from) > limit {
218 return Err(Error::RangeLimit { from, until, limit });
219 }
220
221 let (blocks, vids) = state
222 .read(|state| {
223 async move {
224 join!(
225 state.get_block_range(from..until),
226 state.get_vid_common_range(from..until)
227 )
228 }
229 .boxed()
230 })
231 .await;
232 blocks
233 .zip(vids)
234 .enumerate()
235 .then(|(i, (block, vid))| async move {
236 let (Some(block), Some(vid)) =
237 join!(block.with_timeout(timeout), vid.with_timeout(timeout),)
238 else {
239 return Err(Error::FetchBlock {
240 resource: (from + i).to_string(),
241 });
242 };
243 Ok((block, vid))
244 })
245 .try_collect()
246 .await
247}
248
249fn get_block_stream_for_ns_proof<'a, S>(
250 req: RequestParams,
251 state: &'a S,
252) -> impl 'a
253+ Stream<
254 Item = Result<
255 (
256 NamespaceId,
257 BlockQueryData<SeqTypes>,
258 VidCommonQueryData<SeqTypes>,
259 ),
260 Error,
261 >,
262>
263where
264 S: ReadState,
265 S::State: AvailabilityDataSource<SeqTypes> + Sync,
266{
267 async move {
268 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
269 let height = req.integer_param("height")?;
270 Ok(state
271 .read(|state| {
272 async move {
273 state
274 .subscribe_blocks(height)
275 .await
276 .zip(state.subscribe_vid_common(height).await)
277 .map(move |(block, vid)| (ns_id, block, vid))
278 .map(Ok)
279 }
280 .boxed()
281 })
282 .await)
283 }
284 .try_flatten_stream()
285}
286
287async fn get_state_cert<S>(
288 state: &S,
289 epoch: u64,
290 timeout: Duration,
291) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, availability::Error>
292where
293 S: ReadState,
294 S::State: StateCertDataSource + StateCertFetchingDataSource<SeqTypes> + Sync,
295{
296 let state_cert = state
298 .read(|state| state.get_state_cert_by_epoch(epoch).boxed())
299 .await
300 .map_err(|e| availability::Error::Custom {
301 message: format!("Failed to get state cert: {e}"),
302 status: StatusCode::INTERNAL_SERVER_ERROR,
303 })?;
304
305 match state_cert {
306 Some(cert) => Ok(cert),
307 None => {
308 let cert = state
310 .read(|state| state.request_state_cert(epoch, timeout).boxed())
311 .await?;
312
313 state
315 .read(|state| state.insert_state_cert(epoch, cert.clone()).boxed())
316 .await
317 .map_err(|e| availability::Error::Custom {
318 message: format!("Failed to store state cert: {e}"),
319 status: StatusCode::INTERNAL_SERVER_ERROR,
320 })?;
321
322 Ok(cert)
323 },
324 }
325}
326
327pub(in crate::api) fn availability<N, P, D>(
331 api_ver: semver::Version,
332) -> anyhow::Result<AvailabilityApi<N, P, D, SequencerApiVersion>>
333where
334 N: ConnectedNetwork<PubKey>,
335 D: SequencerDataSource + Send + Sync + 'static,
336 P: SequencerPersistence,
337{
338 let mut options = availability::Options::default();
339 let extension = toml::from_str(include_str!("../../../api/availability.toml"))?;
340 options.extensions.push(extension);
341 let timeout = options.fetch_timeout;
342 let limit = options.large_object_range_limit;
343
344 let mut api = availability::define_api::<AvailState<N, P, D>, SeqTypes, _>(
345 &options,
346 SequencerApiVersion::instance(),
347 api_ver.clone(),
348 )?;
349
350 if api_ver.major == 1 {
351 api.at("getnamespaceproof", move |req, state| {
352 async move {
353 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
354 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
355 let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
356 extract_ns_proof_v1(proof, ns_id)
357 }
358 .boxed()
359 })?
360 .at("getnamespaceproof_range", move |req, state| {
361 async move {
362 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
363 let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
364 try_join_all(blocks.iter().map(|(block, vid)| async move {
365 let proof = get_namespace_proof(block, vid, ns_id, state).await?;
366 extract_ns_proof_v1(proof, ns_id)
367 }))
368 .await
369 }
370 .boxed()
371 })?
372 .stream("stream_namespace_proofs", move |req, state| {
373 get_block_stream_for_ns_proof(req, state)
374 .and_then(move |(ns_id, block, vid)| async move {
375 let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
376 extract_ns_proof_v1(proof, ns_id)
377 })
378 .boxed()
379 })?;
380 } else if api_ver.major == 0 {
381 api.at("getnamespaceproof", move |req, state| {
382 async move {
383 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
384 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
385 let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
386 extract_ns_proof_v0(proof, ns_id)
387 }
388 .boxed()
389 })?
390 .at("getnamespaceproof_range", move |req, state| {
391 async move {
392 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
393 let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
394 try_join_all(blocks.iter().map(|(block, vid)| async move {
395 let proof = get_namespace_proof(block, vid, ns_id, state).await?;
396 extract_ns_proof_v0(proof, ns_id)
397 }))
398 .await
399 }
400 .boxed()
401 })?
402 .stream("stream_namespace_proofs", move |req, state| {
403 get_block_stream_for_ns_proof(req, state)
404 .and_then(move |(ns_id, block, vid)| async move {
405 let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
406 extract_ns_proof_v0(proof, ns_id)
407 })
408 .boxed()
409 })?;
410 }
411
412 if api_ver.major >= 1 {
413 api.at("incorrect_encoding_proof", move |req, state| {
414 async move {
415 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
416 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
417 match get_namespace_proof(&block, &common, ns_id, state).await? {
418 Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
419 Some(_) => Err(Error::Custom {
420 message: "block was correctly encoded".into(),
421 status: StatusCode::NOT_FOUND,
422 }),
423 None => Err(Error::Custom {
424 message: "namespace not present in block".into(),
425 status: StatusCode::NOT_FOUND,
426 }),
427 }
428 }
429 .boxed()
430 })?;
431 }
432
433 api.at("get_state_cert", move |req, state| {
434 async move {
435 let epoch: u64 = req.integer_param("epoch")?;
436 let cert = get_state_cert(state, epoch, timeout).await?;
437 Ok(StateCertQueryDataV1::from(StateCertQueryDataV2(cert)))
438 }
439 .boxed()
440 })?;
441
442 api.at("get_state_cert_v2", move |req, state| {
443 async move {
444 let epoch: u64 = req.integer_param("epoch")?;
445 let cert = get_state_cert(state, epoch, timeout).await?;
446 Ok(StateCertQueryDataV2(cert))
447 }
448 .boxed()
449 })?;
450
451 Ok(api)
452}