1use std::future::Future;
2
3use anyhow::Result;
4use espresso_types::{NamespaceId, SeqTypes, v0_3::StakeTableEvent};
5use hotshot_query_service::{
6 availability::{LeafId, LeafQueryData},
7 node::BlockId,
8};
9use hotshot_types::data::EpochNumber;
10use surf_disco::Url;
11use vbs::version::StaticVersion;
12
13use crate::{
14 consensus::{
15 header::HeaderProof, leaf::LeafProof, namespace::NamespaceProof, payload::PayloadProof,
16 },
17 storage::LeafRequest,
18};
19
20pub trait Client: Send + Sync + 'static {
22 fn block_height(&self) -> impl Send + Future<Output = Result<u64>>;
26
27 fn leaf_proof(
33 &self,
34 id: impl Into<LeafRequest> + Send,
35 finalized: Option<u64>,
36 ) -> impl Send + Future<Output = Result<LeafProof>>;
37
38 fn header_proof(
40 &self,
41 root: u64,
42 id: BlockId<SeqTypes>,
43 ) -> impl Send + Future<Output = Result<HeaderProof>>;
44
45 fn get_leaves_in_range(
47 &self,
48 start: usize,
49 end: usize,
50 ) -> impl Send
51 + Future<
52 Output = Result<Vec<hotshot_query_service::availability::LeafQueryData<SeqTypes>>>,
53 >;
54
55 fn payload_proof(&self, height: u64) -> impl Send + Future<Output = Result<PayloadProof>>;
61
62 fn namespace_proof(
68 &self,
69 height: u64,
70 namespace: NamespaceId,
71 ) -> impl Send + Future<Output = Result<NamespaceProof>>;
72
73 fn namespace_proofs_in_range(
75 &self,
76 start: u64,
77 end: u64,
78 namespace: NamespaceId,
79 ) -> impl Send + Future<Output = Result<Vec<NamespaceProof>>>;
80
81 fn stake_table_events(
86 &self,
87 epoch: EpochNumber,
88 ) -> impl Send + Future<Output = Result<Vec<StakeTableEvent>>>;
89}
90
91#[derive(Clone, Debug)]
93pub struct QueryServiceClient {
94 client: surf_disco::Client<hotshot_query_service::Error, StaticVersion<0, 1>>,
95}
96
97impl QueryServiceClient {
98 pub fn new(url: Url) -> Self {
100 Self {
101 client: surf_disco::Client::new(url),
102 }
103 }
104}
105
106impl Client for QueryServiceClient {
107 async fn block_height(&self) -> Result<u64> {
108 Ok(self.client.get("/node/block-height").send().await?)
109 }
110
111 async fn leaf_proof(
112 &self,
113 id: impl Into<LeafRequest> + Send,
114 finalized: Option<u64>,
115 ) -> Result<LeafProof> {
116 let path = "/light-client/leaf";
117 let path = match id.into() {
118 LeafRequest::Leaf(LeafId::Number(n)) | LeafRequest::Header(BlockId::Number(n)) => {
119 format!("{path}/{n}")
120 },
121 LeafRequest::Leaf(LeafId::Hash(h)) => format!("{path}/hash/{h}"),
122 LeafRequest::Header(BlockId::Hash(h)) => format!("{path}/block-hash/{h}"),
123 LeafRequest::Header(BlockId::PayloadHash(h)) => format!("{path}/payload-hash/{h}"),
124 };
125 let path = match finalized {
126 Some(finalized) => format!("{path}/{finalized}"),
127 None => path,
128 };
129 let proof = self.client.get(&path).send().await?;
130 Ok(proof)
131 }
132
133 async fn get_leaves_in_range(
135 &self,
136 start: usize,
137 end: usize,
138 ) -> Result<Vec<LeafQueryData<SeqTypes>>> {
139 let path = format!("/availability/leaf/{start}/{end}");
140 let leaves = self.client.get(&path).send().await?;
141 Ok(leaves)
142 }
143
144 async fn header_proof(&self, root: u64, id: BlockId<SeqTypes>) -> Result<HeaderProof> {
145 let path = format!("/light-client/header/{root}/{}", fmt_block_id(id));
146 let proof = self.client.get(&path).send().await?;
147 Ok(proof)
148 }
149
150 async fn payload_proof(&self, height: u64) -> Result<PayloadProof> {
151 let path = format!("/light-client/payload/{height}");
152 Ok(self.client.get(&path).send().await?)
153 }
154
155 async fn namespace_proof(&self, height: u64, namespace: NamespaceId) -> Result<NamespaceProof> {
156 Ok(self
157 .client
158 .get(&format!("/light-client/namespace/{height}/{namespace}"))
159 .send()
160 .await?)
161 }
162
163 async fn namespace_proofs_in_range(
164 &self,
165 start: u64,
166 end: u64,
167 namespace: NamespaceId,
168 ) -> Result<Vec<NamespaceProof>> {
169 Ok(self
170 .client
171 .get(&format!(
172 "/light-client/namespace/{start}/{end}/{namespace}"
173 ))
174 .send()
175 .await?)
176 }
177
178 async fn stake_table_events(&self, epoch: EpochNumber) -> Result<Vec<StakeTableEvent>> {
179 Ok(self
180 .client
181 .get(&format!("/light-client/stake-table/{epoch}"))
182 .send()
183 .await?)
184 }
185}
186
187fn fmt_block_id(id: BlockId<SeqTypes>) -> String {
188 match id {
189 BlockId::Number(n) => format!("{n}"),
190 BlockId::Hash(h) => format!("hash/{h}"),
191 BlockId::PayloadHash(h) => format!("payload-hash/{h}"),
192 }
193}
194
195#[cfg(test)]
196mod test {
197 use std::time::Duration;
198
199 use committable::Committable;
200 use espresso_node::{
201 api::{
202 Options,
203 data_source::testing::TestableSequencerDataSource,
204 sql::DataSource,
205 test_helpers::{TestNetwork, TestNetworkConfigBuilder},
206 },
207 testing::{TestConfigBuilder, wait_for_decide_on_handle},
208 };
209 use espresso_types::{Header, Transaction};
210 use futures::{TryStreamExt, stream::StreamExt};
211 use hotshot_query_service::{
212 Resolvable,
213 availability::{BlockQueryData, LeafQueryData},
214 };
215 use pretty_assertions::assert_eq;
216 use rand::RngCore;
217 use test_utils;
218 use tokio::time::sleep;
219 use versions::{EPOCH_VERSION, Upgrade};
220
221 use super::*;
222 use crate::{
223 consensus::leaf::{FinalityProof, LeafProofHint},
224 testing::AlwaysTrueQuorum,
225 };
226
227 #[tokio::test]
228 #[test_log::test]
229 async fn test_block_height() {
230 let port =
231 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
232 let url: Url = format!("http://localhost:{port}").parse().unwrap();
233
234 let test_config = TestConfigBuilder::default().build();
235 let storage = DataSource::create_storage().await;
236 let persistence =
237 <DataSource as TestableSequencerDataSource>::persistence_options(&storage);
238
239 let config = TestNetworkConfigBuilder::<1, _, _>::with_num_nodes()
240 .api_config(
241 DataSource::options(&storage, Options::with_port(port))
242 .light_client(Default::default()),
243 )
244 .persistences([persistence])
245 .network_config(test_config)
246 .build();
247
248 let _network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
249 let client = QueryServiceClient::new(url);
250
251 let initial_height = client.block_height().await.unwrap();
253 tracing::info!(initial_height);
254
255 loop {
256 sleep(Duration::from_secs(1)).await;
257 let height = client.block_height().await.unwrap();
258 if height > initial_height {
259 tracing::info!(height, initial_height, "height increased");
260 break;
261 }
262 tracing::info!("waiting for height to increase");
263 }
264 }
265
266 #[tokio::test]
267 #[test_log::test]
268 async fn test_leaf_proof() {
269 let port =
270 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
271 let url: Url = format!("http://localhost:{port}").parse().unwrap();
272
273 let test_config = TestConfigBuilder::default().build();
274 let storage = DataSource::create_storage().await;
275 let persistence =
276 <DataSource as TestableSequencerDataSource>::persistence_options(&storage);
277
278 let config = TestNetworkConfigBuilder::<1, _, _>::with_num_nodes()
279 .api_config(
280 DataSource::options(&storage, Options::with_port(port))
281 .light_client(Default::default()),
282 )
283 .persistences([persistence])
284 .network_config(test_config)
285 .build();
286
287 let _network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
288 let client = QueryServiceClient::new(url);
289
290 let leaves: Vec<LeafQueryData<SeqTypes>> = client
292 .client
293 .socket("availability/stream/leaves/1")
294 .subscribe()
295 .await
296 .unwrap()
297 .take(2)
298 .try_collect()
299 .await
300 .unwrap();
301
302 let proof = client.leaf_proof(LeafId::Number(1), Some(2)).await.unwrap();
304 assert!(matches!(proof.proof(), FinalityProof::Assumption));
305 assert_eq!(
306 proof
307 .verify(LeafProofHint::assumption(leaves[1].leaf()))
308 .await
309 .unwrap(),
310 leaves[0]
311 );
312
313 for req in [
315 LeafRequest::Header(BlockId::Number(1)),
316 LeafRequest::Leaf(LeafId::Hash(leaves[0].hash())),
317 LeafRequest::Header(BlockId::Hash(leaves[0].block_hash())),
318 ] {
319 tracing::info!(?req, "get proof by alternative ID");
320 let proof = client.leaf_proof(req, None).await.unwrap();
321 assert!(matches!(proof.proof(), FinalityProof::HotStuff2 { .. }));
322 assert_eq!(
323 proof
324 .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
325 .await
326 .unwrap(),
327 leaves[0]
328 );
329 }
330
331 let proof = client
334 .leaf_proof(BlockId::PayloadHash(leaves[0].payload_hash()), None)
335 .await
336 .unwrap();
337 assert_eq!(
338 proof
339 .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
340 .await
341 .unwrap()
342 .payload_hash(),
343 leaves[0].payload_hash()
344 );
345 }
346
347 #[tokio::test]
348 #[test_log::test]
349 async fn test_header_proof() {
350 let port =
351 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
352 let url: Url = format!("http://localhost:{port}").parse().unwrap();
353
354 let test_config = TestConfigBuilder::default().build();
355 let storage = DataSource::create_storage().await;
356 let persistence =
357 <DataSource as TestableSequencerDataSource>::persistence_options(&storage);
358
359 let config = TestNetworkConfigBuilder::<1, _, _>::with_num_nodes()
360 .api_config(
361 DataSource::options(&storage, Options::with_port(port))
362 .light_client(Default::default()),
363 )
364 .persistences([persistence])
365 .network_config(test_config)
366 .build();
367
368 let _network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
369 let client = QueryServiceClient::new(url);
370
371 let headers: Vec<Header> = client
373 .client
374 .socket("availability/stream/headers/1")
375 .subscribe()
376 .await
377 .unwrap()
378 .take(2)
379 .try_collect()
380 .await
381 .unwrap();
382 loop {
384 let state_height: u64 = client
385 .client
386 .get("block-state/block-height")
387 .send()
388 .await
389 .unwrap();
390 if state_height >= 2 {
391 break;
392 }
393 tracing::info!(state_height, "waiting for block state to reach height 2");
394 sleep(Duration::from_secs(1)).await;
395 }
396
397 let proof = client.header_proof(2, BlockId::Number(1)).await.unwrap();
399 assert_eq!(
400 proof.verify(headers[1].block_merkle_tree_root()).unwrap(),
401 headers[0]
402 );
403
404 let proof = client
406 .header_proof(2, BlockId::Hash(headers[0].commitment()))
407 .await
408 .unwrap();
409 assert_eq!(
410 proof.verify(headers[1].block_merkle_tree_root()).unwrap(),
411 headers[0]
412 );
413
414 let proof = client
417 .header_proof(2, BlockId::PayloadHash(headers[0].payload_commitment()))
418 .await
419 .unwrap();
420 assert_eq!(
421 proof
422 .verify(headers[1].block_merkle_tree_root())
423 .unwrap()
424 .payload_commitment(),
425 headers[0].payload_commitment()
426 );
427 }
428
429 #[tokio::test]
430 #[test_log::test]
431 async fn test_payload_proof() {
432 let port =
433 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
434 let url: Url = format!("http://localhost:{port}").parse().unwrap();
435
436 let test_config = TestConfigBuilder::default().build();
437 let storage = DataSource::create_storage().await;
438 let persistence =
439 <DataSource as TestableSequencerDataSource>::persistence_options(&storage);
440
441 let config = TestNetworkConfigBuilder::<1, _, _>::with_num_nodes()
442 .api_config(
443 DataSource::options(&storage, Options::with_port(port))
444 .light_client(Default::default()),
445 )
446 .persistences([persistence])
447 .network_config(test_config)
448 .build();
449
450 let _network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
451 let client = QueryServiceClient::new(url);
452
453 let block: BlockQueryData<SeqTypes> = client
455 .client
456 .socket("availability/stream/blocks/1")
457 .subscribe()
458 .await
459 .unwrap()
460 .next()
461 .await
462 .unwrap()
463 .unwrap();
464
465 let proof = client.payload_proof(1).await.unwrap();
466 assert_eq!(proof.verify(block.header()).unwrap(), *block.payload());
467
468 let ns = NamespaceId::from(0u64);
471 let ns_proof = client.namespace_proof(1, ns).await.unwrap();
472 assert_eq!(ns_proof.verify(block.header(), ns).unwrap(), vec![]);
473 }
474
475 #[tokio::test]
476 #[test_log::test]
477 async fn test_namespace_proof() {
478 let port =
479 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
480 let url: Url = format!("http://localhost:{port}").parse().unwrap();
481
482 let test_config = TestConfigBuilder::default().build();
483 let storage = DataSource::create_storage().await;
484 let persistence =
485 <DataSource as TestableSequencerDataSource>::persistence_options(&storage);
486
487 let config = TestNetworkConfigBuilder::<1, _, _>::with_num_nodes()
488 .api_config(
489 DataSource::options(&storage, Options::with_port(port))
490 .light_client(Default::default()),
491 )
492 .persistences([persistence])
493 .network_config(test_config)
494 .build();
495
496 let network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
497 let client = QueryServiceClient::new(url);
498
499 let ns = NamespaceId::from(1u64);
501 let mut events = network.server.event_stream().await;
502 let mut txs = vec![];
503 let mut headers = vec![];
504 for _ in 0..2 {
505 let mut bytes = vec![0; 32];
506 rand::thread_rng().fill_bytes(&mut bytes);
507 let tx = Transaction::new(ns, bytes);
508 network
509 .server
510 .consensus()
511 .read()
512 .await
513 .submit_transaction(tx.clone())
514 .await
515 .unwrap();
516 let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
517 tracing::info!(block, hash = %tx.commit(), ?tx, "transaction included");
518
519 let header: Header = client
520 .client
521 .get(&format!("availability/header/{block}"))
522 .send()
523 .await
524 .unwrap();
525
526 txs.push(tx);
527 headers.push(header);
528 }
529
530 for (tx, header) in txs.iter().zip(&headers) {
532 let proof = client.namespace_proof(header.height(), ns).await.unwrap();
533 assert_eq!(proof.verify(header, ns).unwrap(), std::slice::from_ref(tx));
534 }
535
536 let proofs = client
538 .namespace_proofs_in_range(headers[0].height(), headers[1].height() + 1, ns)
539 .await
540 .unwrap();
541 assert_eq!(
542 proofs.len() as u64,
543 headers[1].height() + 1 - headers[0].height()
544 );
545 assert_eq!(
546 proofs[0].verify(&headers[0], ns).unwrap(),
547 std::slice::from_ref(&txs[0])
548 );
549 assert_eq!(
550 proofs[proofs.len() - 1].verify(&headers[1], ns).unwrap(),
551 std::slice::from_ref(&txs[1])
552 );
553 for (i, proof) in proofs.iter().enumerate().take(proofs.len() - 1).skip(1) {
555 let header = client
556 .client
557 .get(&format!(
558 "availability/header/{}",
559 headers[0].height() + (i as u64)
560 ))
561 .send()
562 .await
563 .unwrap();
564 assert_eq!(proof.verify(&header, ns).unwrap(), vec![]);
565 }
566 }
567}