Skip to main content

hotshot_query_service/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! A node's view of a HotShot chain
14//!
15//! The node API provides a subjective view of the HotShot blockchain, from the perspective of
16//! one particular node. It provides access to information that the
17//! [availability](crate::availability) API does not, because this information depends on the
18//! perspective of the node observing it, and may be subject to eventual consistency. For example,
19//! `/node/block-height` may return smaller counts than expected, if the node being queried is not
20//! fully synced with the entire history of the chain. However, the node will _eventually_ sync and
21//! return the expected counts.
22
23use std::{ops::Bound, path::PathBuf};
24
25use futures::FutureExt;
26use hotshot_types::traits::node_implementation::NodeType;
27use snafu::ResultExt;
28use tide_disco::{Api, api::ApiError, method::ReadState};
29use vbs::version::StaticVersionType;
30
31use crate::{Header, api::load_api, availability::QueryableHeader};
32
33pub(crate) mod data_source;
34pub(crate) mod query_data;
35pub use data_source::*;
36pub use hotshot_query_service_types::node::*;
37
38#[derive(Debug)]
39pub struct Options {
40    pub api_path: Option<PathBuf>,
41
42    /// Additional API specification files to merge with `node-api-path`.
43    ///
44    /// These optional files may contain route definitions for application-specific routes that have
45    /// been added as extensions to the basic node API.
46    pub extensions: Vec<toml::Value>,
47
48    /// The maximum number of headers which can be loaded in a single `header/window` query.
49    pub window_limit: usize,
50}
51
52impl Default for Options {
53    fn default() -> Self {
54        Self {
55            api_path: None,
56            extensions: vec![],
57            window_limit: 500,
58        }
59    }
60}
61
62pub fn define_api<State, Types, Ver: StaticVersionType + 'static>(
63    options: &Options,
64    _: Ver,
65    api_ver: semver::Version,
66) -> Result<Api<State, Error, Ver>, ApiError>
67where
68    Types: NodeType,
69    Header<Types>: QueryableHeader<Types>,
70    State: 'static + Send + Sync + ReadState,
71    <State as ReadState>::State: NodeDataSource<Types> + Send + Sync,
72{
73    let mut api = load_api::<State, Error, Ver>(
74        options.api_path.as_ref(),
75        include_str!("../api/node.toml"),
76        options.extensions.clone(),
77    )?;
78    let window_limit = options.window_limit;
79    api.with_version(api_ver)
80        .get("block_height", |_req, state| {
81            async move { state.block_height().await.context(QuerySnafu) }.boxed()
82        })?
83        .get("count_transactions", |req, state| {
84            async move {
85                let from: Bound<usize> = match req.opt_integer_param("from")? {
86                    Some(from) => Bound::Included(from),
87                    None => Bound::Unbounded,
88                };
89                let to = match req.opt_integer_param("to")? {
90                    Some(to) => Bound::Included(to),
91                    None => Bound::Unbounded,
92                };
93
94                let ns = req.opt_integer_param::<_, i64>("namespace")?;
95
96                Ok(state
97                    .count_transactions_in_range((from, to), ns.map(Into::into))
98                    .await?)
99            }
100            .boxed()
101        })?
102        .get("payload_size", |req, state| {
103            async move {
104                let from: Bound<usize> = match req.opt_integer_param("from")? {
105                    Some(from) => Bound::Included(from),
106                    None => Bound::Unbounded,
107                };
108                let to = match req.opt_integer_param("to")? {
109                    Some(to) => Bound::Included(to),
110                    None => Bound::Unbounded,
111                };
112
113                let ns = req.opt_integer_param::<_, i64>("namespace")?;
114
115                Ok(state
116                    .payload_size_in_range((from, to), ns.map(Into::into))
117                    .await?)
118            }
119            .boxed()
120        })?
121        .get("get_vid_share", |req, state| {
122            async move {
123                let id = if let Some(height) = req.opt_integer_param("height")? {
124                    BlockId::Number(height)
125                } else if let Some(hash) = req.opt_blob_param("hash")? {
126                    BlockId::Hash(hash)
127                } else {
128                    BlockId::PayloadHash(req.blob_param("payload-hash")?)
129                };
130                state.vid_share(id).await.context(QueryVidSnafu {
131                    block: id.to_string(),
132                })
133            }
134            .boxed()
135        })?
136        .get("sync_status", |_req, state| {
137            async move { state.sync_status().await.context(QuerySnafu) }.boxed()
138        })?
139        .get("get_header_window", move |req, state| {
140            async move {
141                let start = if let Some(height) = req.opt_integer_param("height")? {
142                    WindowStart::Height(height)
143                } else if let Some(hash) = req.opt_blob_param("hash")? {
144                    WindowStart::Hash(hash)
145                } else {
146                    WindowStart::Time(req.integer_param("start")?)
147                };
148                let end = req.integer_param("end")?;
149                state
150                    .get_header_window(start, end, window_limit)
151                    .await
152                    .context(QueryWindowSnafu {
153                        start: format!("{start:?}"),
154                        end,
155                    })
156            }
157            .boxed()
158        })?
159        .get("get_limits", move |_req, _state| {
160            async move { Ok(Limits { window_limit }) }.boxed()
161        })?;
162    Ok(api)
163}
164
165#[cfg(test)]
166mod test {
167    use std::time::Duration;
168
169    use async_lock::RwLock;
170    use committable::Committable;
171    use futures::{FutureExt, StreamExt};
172    use hotshot_types::{
173        data::{VidDisperseShare, VidShare},
174        event::{EventType, LeafInfo},
175        traits::{
176            EncodeBytes,
177            block_contents::{BlockHeader, BlockPayload},
178        },
179    };
180    use surf_disco::Client;
181    use tempfile::TempDir;
182    use test_utils::reserve_tcp_port;
183    use tide_disco::{App, Error as _, StatusCode};
184    use tokio::time::sleep;
185    use toml::toml;
186
187    use super::*;
188    use crate::{
189        ApiState, Error, Header,
190        data_source::ExtensibleDataSource,
191        task::BackgroundTask,
192        testing::{
193            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
194            mocks::{MockBase, MockTypes, mock_transaction},
195        },
196    };
197
198    #[test_log::test(tokio::test(flavor = "multi_thread"))]
199    async fn test_api() {
200        let window_limit = 78;
201
202        // Create the consensus network.
203        let mut network = MockNetwork::<MockDataSource>::init().await;
204        let mut events = network.handle().event_stream();
205        network.start().await;
206
207        // Start the web server.
208        let port = reserve_tcp_port().unwrap();
209        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
210        app.register_module(
211            "node",
212            define_api(
213                &Options {
214                    window_limit,
215                    ..Default::default()
216                },
217                MockBase::instance(),
218                "1.0.0".parse().unwrap(),
219            )
220            .unwrap(),
221        )
222        .unwrap();
223        network.spawn(
224            "server",
225            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
226        );
227
228        // Start a client.
229        let client = Client::<Error, MockBase>::new(
230            format!("http://localhost:{port}/node").parse().unwrap(),
231        );
232        assert!(client.connect(Some(Duration::from_secs(60))).await);
233
234        // Check limits endpoint.
235        assert_eq!(
236            client.get::<Limits>("limits").send().await.unwrap(),
237            Limits { window_limit }
238        );
239
240        // Wait until a few blocks have been sequenced.
241        let block_height = loop {
242            let block_height = client.get::<usize>("block-height").send().await.unwrap();
243            if block_height > network.num_nodes() {
244                break block_height;
245            }
246            sleep(Duration::from_secs(1)).await;
247        };
248
249        // We test these counters with non-trivial values in `data_source.rs`, here we just want to
250        // make sure the API handlers are working, so a response of 0 is fine.
251        assert_eq!(
252            client
253                .get::<u64>("transactions/count")
254                .send()
255                .await
256                .unwrap(),
257            0
258        );
259        assert_eq!(
260            client
261                .get::<u64>("payloads/total-size")
262                .send()
263                .await
264                .unwrap(),
265            0
266        );
267
268        let mut headers = vec![];
269
270        // Get VID share for each block.
271        tracing::info!(block_height, "checking VID shares");
272        'outer: while let Some(event) = events.next().await {
273            let EventType::Decide { leaf_chain, .. } = event.event else {
274                continue;
275            };
276            for LeafInfo {
277                leaf, vid_share, ..
278            } in leaf_chain.iter().rev()
279            {
280                headers.push(leaf.block_header().clone());
281                if leaf.block_header().block_number >= block_height as u64 {
282                    break 'outer;
283                }
284                tracing::info!(height = leaf.block_header().block_number, "checking share");
285
286                let share = client
287                    .get::<VidShare>(&format!("vid/share/{}", leaf.block_header().block_number))
288                    .send()
289                    .await
290                    .unwrap();
291                if let Some(vid_share) = vid_share.as_ref() {
292                    let VidDisperseShare::V0(new_share) = vid_share else {
293                        panic!("VID share is not V0");
294                    };
295                    assert_eq!(share, VidShare::V0(new_share.share.clone()));
296                }
297
298                // Query various other ways.
299                assert_eq!(
300                    share,
301                    client
302                        .get(&format!("vid/share/hash/{}", leaf.block_header().commit()))
303                        .send()
304                        .await
305                        .unwrap()
306                );
307                assert_eq!(
308                    share,
309                    client
310                        .get(&format!(
311                            "vid/share/payload-hash/{}",
312                            leaf.block_header().payload_commitment
313                        ))
314                        .send()
315                        .await
316                        .unwrap()
317                );
318            }
319        }
320
321        // Check time window queries. The various edge cases are thoroughly tested for each
322        // individual data source. In this test, we just smoketest API parameter handling. Sleep 2
323        // seconds to ensure a new header is produced with a timestamp after the latest one in
324        // `headers`
325        sleep(Duration::from_secs(2)).await;
326        let first_header = &headers[0];
327        let last_header = &headers.last().unwrap();
328        let window: TimeWindowQueryData<Header<MockTypes>> = client
329            .get(&format!(
330                "header/window/{}/{}",
331                first_header.timestamp,
332                last_header.timestamp + 1
333            ))
334            .send()
335            .await
336            .unwrap();
337        assert!(window.window.contains(first_header));
338        assert!(window.window.contains(last_header));
339        assert!(window.next.is_some());
340
341        // Query for the same window other ways.
342        assert_eq!(
343            window,
344            client
345                .get(&format!(
346                    "header/window/from/0/{}",
347                    last_header.timestamp + 1
348                ))
349                .send()
350                .await
351                .unwrap()
352        );
353        assert_eq!(
354            window,
355            client
356                .get(&format!(
357                    "header/window/from/hash/{}/{}",
358                    first_header.commit(),
359                    last_header.timestamp + 1
360                ))
361                .send()
362                .await
363                .unwrap()
364        );
365
366        // In this simple test, the node should be fully synchronized.
367        let sync_status = client
368            .get::<SyncStatusQueryData>("sync-status")
369            .send()
370            .await
371            .unwrap();
372        assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
373
374        network.shut_down().await;
375    }
376
377    #[test_log::test(tokio::test(flavor = "multi_thread"))]
378    async fn test_aggregate_ranges() {
379        // Create the consensus network.
380        let mut network = MockNetwork::<MockSqlDataSource>::init().await;
381        let mut events = network.handle().event_stream();
382        network.start().await;
383
384        // Start the web server.
385        let port = reserve_tcp_port().unwrap();
386        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
387        app.register_module(
388            "node",
389            define_api(
390                &Default::default(),
391                MockBase::instance(),
392                "1.0.0".parse().unwrap(),
393            )
394            .unwrap(),
395        )
396        .unwrap();
397        network.spawn(
398            "server",
399            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
400        );
401
402        // Start a client.
403        let client =
404            Client::<Error, MockBase>::new(format!("http://localhost:{port}").parse().unwrap());
405        assert!(client.connect(Some(Duration::from_secs(60))).await);
406
407        // Wait until a few transactions have been sequenced.
408        let mut tx_heights = vec![];
409        let mut tx_sizes = vec![];
410        for i in [1, 2] {
411            let txn = mock_transaction(vec![0; i]);
412            let hash = txn.commit();
413
414            network.submit_transaction(txn).await;
415
416            let leaf = 'outer: loop {
417                let EventType::Decide { leaf_chain, .. } = events.next().await.unwrap().event
418                else {
419                    continue;
420                };
421                for info in leaf_chain.iter().rev() {
422                    let leaf = &info.leaf;
423                    if BlockPayload::<MockTypes>::transaction_commitments(
424                        &leaf.block_payload().unwrap(),
425                        BlockHeader::<MockTypes>::metadata(leaf.block_header()),
426                    )
427                    .contains(&hash)
428                    {
429                        break 'outer leaf.clone();
430                    }
431                }
432
433                tracing::info!("waiting for tx {i}");
434                sleep(Duration::from_secs(1)).await;
435            };
436            tx_heights.push(leaf.height());
437            tx_sizes.push(leaf.block_payload().unwrap().encode().len());
438        }
439        tracing::info!(?tx_heights, ?tx_sizes, "transactions sequenced");
440
441        // Wait for the aggregator to process the inserted blocks.
442        while let Err(err) = client
443            .get::<usize>(&format!("node/transactions/count/{}", tx_heights[1]))
444            .send()
445            .await
446        {
447            if err.status() == StatusCode::NOT_FOUND {
448                tracing::info!(?tx_heights, "waiting for aggregator");
449                sleep(Duration::from_secs(1)).await;
450                continue;
451            } else {
452                panic!("unexpected error: {err:#}");
453            }
454        }
455
456        // Range including empty blocks (genesis block) only
457        assert_eq!(
458            0,
459            client
460                .get::<usize>("node/transactions/count/0")
461                .send()
462                .await
463                .unwrap()
464        );
465        assert_eq!(
466            0,
467            client
468                .get::<usize>("node/payloads/size/0")
469                .send()
470                .await
471                .unwrap()
472        );
473
474        // First transaction only
475        assert_eq!(
476            1,
477            client
478                .get::<usize>(&format!("node/transactions/count/{}", tx_heights[0]))
479                .send()
480                .await
481                .unwrap()
482        );
483        assert_eq!(
484            tx_sizes[0],
485            client
486                .get::<usize>(&format!("node/payloads/size/{}", tx_heights[0]))
487                .send()
488                .await
489                .unwrap()
490        );
491
492        // Last transaction only
493        assert_eq!(
494            1,
495            client
496                .get::<usize>(&format!(
497                    "node/transactions/count/{}/{}",
498                    tx_heights[0] + 1,
499                    tx_heights[1]
500                ))
501                .send()
502                .await
503                .unwrap()
504        );
505        assert_eq!(
506            tx_sizes[1],
507            client
508                .get::<usize>(&format!(
509                    "node/payloads/size/{}/{}",
510                    tx_heights[0] + 1,
511                    tx_heights[1]
512                ))
513                .send()
514                .await
515                .unwrap()
516        );
517
518        // All transactions
519        assert_eq!(
520            2,
521            client
522                .get::<usize>("node/transactions/count",)
523                .send()
524                .await
525                .unwrap()
526        );
527        assert_eq!(
528            tx_sizes[0] + tx_sizes[1],
529            client
530                .get::<usize>("node/payloads/size",)
531                .send()
532                .await
533                .unwrap()
534        );
535
536        network.shut_down().await;
537    }
538
539    #[test_log::test(tokio::test(flavor = "multi_thread"))]
540    async fn test_extensions() {
541        let dir = TempDir::with_prefix("test_node_extensions").unwrap();
542        let data_source = ExtensibleDataSource::new(
543            MockDataSource::create(dir.path(), Default::default())
544                .await
545                .unwrap(),
546            0,
547        );
548
549        // Create the API extensions specification.
550        let extensions = toml! {
551            [route.post_ext]
552            PATH = ["/ext/:val"]
553            METHOD = "POST"
554            ":val" = "Integer"
555
556            [route.get_ext]
557            PATH = ["/ext"]
558            METHOD = "GET"
559        };
560
561        let mut api =
562            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
563                &Options {
564                    extensions: vec![extensions.into()],
565                    ..Default::default()
566                },
567                MockBase::instance(),
568                "1.0.0".parse().unwrap(),
569            )
570            .unwrap();
571        api.get("get_ext", |_, state| {
572            async move { Ok(*state.as_ref()) }.boxed()
573        })
574        .unwrap()
575        .post("post_ext", |req, state| {
576            async move {
577                *state.as_mut() = req.integer_param("val")?;
578                Ok(())
579            }
580            .boxed()
581        })
582        .unwrap();
583
584        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
585        app.register_module("node", api).unwrap();
586
587        let port = reserve_tcp_port().unwrap();
588        let _server = BackgroundTask::spawn(
589            "server",
590            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
591        );
592
593        let client = Client::<Error, MockBase>::new(
594            format!("http://localhost:{port}/node").parse().unwrap(),
595        );
596        assert!(client.connect(Some(Duration::from_secs(60))).await);
597
598        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
599        client.post::<()>("ext/42").send().await.unwrap();
600        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
601
602        // Ensure we can still access the built-in functionality.
603        let sync_status: SyncStatusQueryData = client.get("sync-status").send().await.unwrap();
604        assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
605    }
606}