1use std::{ops::Bound, path::PathBuf};
24
25use futures::FutureExt;
26use hotshot_types::traits::node_implementation::NodeType;
27use snafu::ResultExt;
28use tide_disco::{Api, api::ApiError, method::ReadState};
29use vbs::version::StaticVersionType;
30
31use crate::{Header, api::load_api, availability::QueryableHeader};
32
33pub(crate) mod data_source;
34pub(crate) mod query_data;
35pub use data_source::*;
36pub use hotshot_query_service_types::node::*;
37
38#[derive(Debug)]
39pub struct Options {
40 pub api_path: Option<PathBuf>,
41
42 pub extensions: Vec<toml::Value>,
47
48 pub window_limit: usize,
50}
51
52impl Default for Options {
53 fn default() -> Self {
54 Self {
55 api_path: None,
56 extensions: vec![],
57 window_limit: 500,
58 }
59 }
60}
61
62pub fn define_api<State, Types, Ver: StaticVersionType + 'static>(
63 options: &Options,
64 _: Ver,
65 api_ver: semver::Version,
66) -> Result<Api<State, Error, Ver>, ApiError>
67where
68 Types: NodeType,
69 Header<Types>: QueryableHeader<Types>,
70 State: 'static + Send + Sync + ReadState,
71 <State as ReadState>::State: NodeDataSource<Types> + Send + Sync,
72{
73 let mut api = load_api::<State, Error, Ver>(
74 options.api_path.as_ref(),
75 include_str!("../api/node.toml"),
76 options.extensions.clone(),
77 )?;
78 let window_limit = options.window_limit;
79 api.with_version(api_ver)
80 .get("block_height", |_req, state| {
81 async move { state.block_height().await.context(QuerySnafu) }.boxed()
82 })?
83 .get("count_transactions", |req, state| {
84 async move {
85 let from: Bound<usize> = match req.opt_integer_param("from")? {
86 Some(from) => Bound::Included(from),
87 None => Bound::Unbounded,
88 };
89 let to = match req.opt_integer_param("to")? {
90 Some(to) => Bound::Included(to),
91 None => Bound::Unbounded,
92 };
93
94 let ns = req.opt_integer_param::<_, i64>("namespace")?;
95
96 Ok(state
97 .count_transactions_in_range((from, to), ns.map(Into::into))
98 .await?)
99 }
100 .boxed()
101 })?
102 .get("payload_size", |req, state| {
103 async move {
104 let from: Bound<usize> = match req.opt_integer_param("from")? {
105 Some(from) => Bound::Included(from),
106 None => Bound::Unbounded,
107 };
108 let to = match req.opt_integer_param("to")? {
109 Some(to) => Bound::Included(to),
110 None => Bound::Unbounded,
111 };
112
113 let ns = req.opt_integer_param::<_, i64>("namespace")?;
114
115 Ok(state
116 .payload_size_in_range((from, to), ns.map(Into::into))
117 .await?)
118 }
119 .boxed()
120 })?
121 .get("get_vid_share", |req, state| {
122 async move {
123 let id = if let Some(height) = req.opt_integer_param("height")? {
124 BlockId::Number(height)
125 } else if let Some(hash) = req.opt_blob_param("hash")? {
126 BlockId::Hash(hash)
127 } else {
128 BlockId::PayloadHash(req.blob_param("payload-hash")?)
129 };
130 state.vid_share(id).await.context(QueryVidSnafu {
131 block: id.to_string(),
132 })
133 }
134 .boxed()
135 })?
136 .get("sync_status", |_req, state| {
137 async move { state.sync_status().await.context(QuerySnafu) }.boxed()
138 })?
139 .get("get_header_window", move |req, state| {
140 async move {
141 let start = if let Some(height) = req.opt_integer_param("height")? {
142 WindowStart::Height(height)
143 } else if let Some(hash) = req.opt_blob_param("hash")? {
144 WindowStart::Hash(hash)
145 } else {
146 WindowStart::Time(req.integer_param("start")?)
147 };
148 let end = req.integer_param("end")?;
149 state
150 .get_header_window(start, end, window_limit)
151 .await
152 .context(QueryWindowSnafu {
153 start: format!("{start:?}"),
154 end,
155 })
156 }
157 .boxed()
158 })?
159 .get("get_limits", move |_req, _state| {
160 async move { Ok(Limits { window_limit }) }.boxed()
161 })?;
162 Ok(api)
163}
164
165#[cfg(test)]
166mod test {
167 use std::time::Duration;
168
169 use async_lock::RwLock;
170 use committable::Committable;
171 use futures::{FutureExt, StreamExt};
172 use hotshot_types::{
173 data::{VidDisperseShare, VidShare},
174 event::{EventType, LeafInfo},
175 traits::{
176 EncodeBytes,
177 block_contents::{BlockHeader, BlockPayload},
178 },
179 };
180 use surf_disco::Client;
181 use tempfile::TempDir;
182 use test_utils::reserve_tcp_port;
183 use tide_disco::{App, Error as _, StatusCode};
184 use tokio::time::sleep;
185 use toml::toml;
186
187 use super::*;
188 use crate::{
189 ApiState, Error, Header,
190 data_source::ExtensibleDataSource,
191 task::BackgroundTask,
192 testing::{
193 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
194 mocks::{MockBase, MockTypes, mock_transaction},
195 },
196 };
197
198 #[test_log::test(tokio::test(flavor = "multi_thread"))]
199 async fn test_api() {
200 let window_limit = 78;
201
202 let mut network = MockNetwork::<MockDataSource>::init().await;
204 let mut events = network.handle().event_stream();
205 network.start().await;
206
207 let port = reserve_tcp_port().unwrap();
209 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
210 app.register_module(
211 "node",
212 define_api(
213 &Options {
214 window_limit,
215 ..Default::default()
216 },
217 MockBase::instance(),
218 "1.0.0".parse().unwrap(),
219 )
220 .unwrap(),
221 )
222 .unwrap();
223 network.spawn(
224 "server",
225 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
226 );
227
228 let client = Client::<Error, MockBase>::new(
230 format!("http://localhost:{port}/node").parse().unwrap(),
231 );
232 assert!(client.connect(Some(Duration::from_secs(60))).await);
233
234 assert_eq!(
236 client.get::<Limits>("limits").send().await.unwrap(),
237 Limits { window_limit }
238 );
239
240 let block_height = loop {
242 let block_height = client.get::<usize>("block-height").send().await.unwrap();
243 if block_height > network.num_nodes() {
244 break block_height;
245 }
246 sleep(Duration::from_secs(1)).await;
247 };
248
249 assert_eq!(
252 client
253 .get::<u64>("transactions/count")
254 .send()
255 .await
256 .unwrap(),
257 0
258 );
259 assert_eq!(
260 client
261 .get::<u64>("payloads/total-size")
262 .send()
263 .await
264 .unwrap(),
265 0
266 );
267
268 let mut headers = vec![];
269
270 tracing::info!(block_height, "checking VID shares");
272 'outer: while let Some(event) = events.next().await {
273 let EventType::Decide { leaf_chain, .. } = event.event else {
274 continue;
275 };
276 for LeafInfo {
277 leaf, vid_share, ..
278 } in leaf_chain.iter().rev()
279 {
280 headers.push(leaf.block_header().clone());
281 if leaf.block_header().block_number >= block_height as u64 {
282 break 'outer;
283 }
284 tracing::info!(height = leaf.block_header().block_number, "checking share");
285
286 let share = client
287 .get::<VidShare>(&format!("vid/share/{}", leaf.block_header().block_number))
288 .send()
289 .await
290 .unwrap();
291 if let Some(vid_share) = vid_share.as_ref() {
292 let VidDisperseShare::V0(new_share) = vid_share else {
293 panic!("VID share is not V0");
294 };
295 assert_eq!(share, VidShare::V0(new_share.share.clone()));
296 }
297
298 assert_eq!(
300 share,
301 client
302 .get(&format!("vid/share/hash/{}", leaf.block_header().commit()))
303 .send()
304 .await
305 .unwrap()
306 );
307 assert_eq!(
308 share,
309 client
310 .get(&format!(
311 "vid/share/payload-hash/{}",
312 leaf.block_header().payload_commitment
313 ))
314 .send()
315 .await
316 .unwrap()
317 );
318 }
319 }
320
321 sleep(Duration::from_secs(2)).await;
326 let first_header = &headers[0];
327 let last_header = &headers.last().unwrap();
328 let window: TimeWindowQueryData<Header<MockTypes>> = client
329 .get(&format!(
330 "header/window/{}/{}",
331 first_header.timestamp,
332 last_header.timestamp + 1
333 ))
334 .send()
335 .await
336 .unwrap();
337 assert!(window.window.contains(first_header));
338 assert!(window.window.contains(last_header));
339 assert!(window.next.is_some());
340
341 assert_eq!(
343 window,
344 client
345 .get(&format!(
346 "header/window/from/0/{}",
347 last_header.timestamp + 1
348 ))
349 .send()
350 .await
351 .unwrap()
352 );
353 assert_eq!(
354 window,
355 client
356 .get(&format!(
357 "header/window/from/hash/{}/{}",
358 first_header.commit(),
359 last_header.timestamp + 1
360 ))
361 .send()
362 .await
363 .unwrap()
364 );
365
366 let sync_status = client
368 .get::<SyncStatusQueryData>("sync-status")
369 .send()
370 .await
371 .unwrap();
372 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
373
374 network.shut_down().await;
375 }
376
377 #[test_log::test(tokio::test(flavor = "multi_thread"))]
378 async fn test_aggregate_ranges() {
379 let mut network = MockNetwork::<MockSqlDataSource>::init().await;
381 let mut events = network.handle().event_stream();
382 network.start().await;
383
384 let port = reserve_tcp_port().unwrap();
386 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
387 app.register_module(
388 "node",
389 define_api(
390 &Default::default(),
391 MockBase::instance(),
392 "1.0.0".parse().unwrap(),
393 )
394 .unwrap(),
395 )
396 .unwrap();
397 network.spawn(
398 "server",
399 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
400 );
401
402 let client =
404 Client::<Error, MockBase>::new(format!("http://localhost:{port}").parse().unwrap());
405 assert!(client.connect(Some(Duration::from_secs(60))).await);
406
407 let mut tx_heights = vec![];
409 let mut tx_sizes = vec![];
410 for i in [1, 2] {
411 let txn = mock_transaction(vec![0; i]);
412 let hash = txn.commit();
413
414 network.submit_transaction(txn).await;
415
416 let leaf = 'outer: loop {
417 let EventType::Decide { leaf_chain, .. } = events.next().await.unwrap().event
418 else {
419 continue;
420 };
421 for info in leaf_chain.iter().rev() {
422 let leaf = &info.leaf;
423 if BlockPayload::<MockTypes>::transaction_commitments(
424 &leaf.block_payload().unwrap(),
425 BlockHeader::<MockTypes>::metadata(leaf.block_header()),
426 )
427 .contains(&hash)
428 {
429 break 'outer leaf.clone();
430 }
431 }
432
433 tracing::info!("waiting for tx {i}");
434 sleep(Duration::from_secs(1)).await;
435 };
436 tx_heights.push(leaf.height());
437 tx_sizes.push(leaf.block_payload().unwrap().encode().len());
438 }
439 tracing::info!(?tx_heights, ?tx_sizes, "transactions sequenced");
440
441 while let Err(err) = client
443 .get::<usize>(&format!("node/transactions/count/{}", tx_heights[1]))
444 .send()
445 .await
446 {
447 if err.status() == StatusCode::NOT_FOUND {
448 tracing::info!(?tx_heights, "waiting for aggregator");
449 sleep(Duration::from_secs(1)).await;
450 continue;
451 } else {
452 panic!("unexpected error: {err:#}");
453 }
454 }
455
456 assert_eq!(
458 0,
459 client
460 .get::<usize>("node/transactions/count/0")
461 .send()
462 .await
463 .unwrap()
464 );
465 assert_eq!(
466 0,
467 client
468 .get::<usize>("node/payloads/size/0")
469 .send()
470 .await
471 .unwrap()
472 );
473
474 assert_eq!(
476 1,
477 client
478 .get::<usize>(&format!("node/transactions/count/{}", tx_heights[0]))
479 .send()
480 .await
481 .unwrap()
482 );
483 assert_eq!(
484 tx_sizes[0],
485 client
486 .get::<usize>(&format!("node/payloads/size/{}", tx_heights[0]))
487 .send()
488 .await
489 .unwrap()
490 );
491
492 assert_eq!(
494 1,
495 client
496 .get::<usize>(&format!(
497 "node/transactions/count/{}/{}",
498 tx_heights[0] + 1,
499 tx_heights[1]
500 ))
501 .send()
502 .await
503 .unwrap()
504 );
505 assert_eq!(
506 tx_sizes[1],
507 client
508 .get::<usize>(&format!(
509 "node/payloads/size/{}/{}",
510 tx_heights[0] + 1,
511 tx_heights[1]
512 ))
513 .send()
514 .await
515 .unwrap()
516 );
517
518 assert_eq!(
520 2,
521 client
522 .get::<usize>("node/transactions/count",)
523 .send()
524 .await
525 .unwrap()
526 );
527 assert_eq!(
528 tx_sizes[0] + tx_sizes[1],
529 client
530 .get::<usize>("node/payloads/size",)
531 .send()
532 .await
533 .unwrap()
534 );
535
536 network.shut_down().await;
537 }
538
539 #[test_log::test(tokio::test(flavor = "multi_thread"))]
540 async fn test_extensions() {
541 let dir = TempDir::with_prefix("test_node_extensions").unwrap();
542 let data_source = ExtensibleDataSource::new(
543 MockDataSource::create(dir.path(), Default::default())
544 .await
545 .unwrap(),
546 0,
547 );
548
549 let extensions = toml! {
551 [route.post_ext]
552 PATH = ["/ext/:val"]
553 METHOD = "POST"
554 ":val" = "Integer"
555
556 [route.get_ext]
557 PATH = ["/ext"]
558 METHOD = "GET"
559 };
560
561 let mut api =
562 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
563 &Options {
564 extensions: vec![extensions.into()],
565 ..Default::default()
566 },
567 MockBase::instance(),
568 "1.0.0".parse().unwrap(),
569 )
570 .unwrap();
571 api.get("get_ext", |_, state| {
572 async move { Ok(*state.as_ref()) }.boxed()
573 })
574 .unwrap()
575 .post("post_ext", |req, state| {
576 async move {
577 *state.as_mut() = req.integer_param("val")?;
578 Ok(())
579 }
580 .boxed()
581 })
582 .unwrap();
583
584 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
585 app.register_module("node", api).unwrap();
586
587 let port = reserve_tcp_port().unwrap();
588 let _server = BackgroundTask::spawn(
589 "server",
590 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
591 );
592
593 let client = Client::<Error, MockBase>::new(
594 format!("http://localhost:{port}/node").parse().unwrap(),
595 );
596 assert!(client.connect(Some(Duration::from_secs(60))).await);
597
598 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
599 client.post::<()>("ext/42").send().await.unwrap();
600 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
601
602 let sync_status: SyncStatusQueryData = client.get("sync-status").send().await.unwrap();
604 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
605 }
606}