espresso_node/request_response/
mod.rs1use std::future::Future;
2
3use data_source::DataSource;
4use derive_more::derive::Deref;
5use espresso_types::{PubKey, SeqTypes, traits::SequencerPersistence};
6use hotshot::{traits::NodeImplementation, types::BLSPrivKey};
7use hotshot_types::traits::network::ConnectedNetwork;
8use network::Sender;
9use recipient_source::RecipientSource;
10use request::{Request, Response};
11use request_response::{
12 RequestError, RequestResponse, RequestResponseConfig, RequestType, network::Bytes,
13};
14use tokio::sync::mpsc::Receiver;
15
16pub mod catchup;
17pub mod data_source;
18pub mod network;
19pub mod recipient_source;
20pub mod request;
21
22#[derive(Clone, Deref)]
25pub struct RequestResponseProtocol<
26 I: NodeImplementation<SeqTypes>,
27 N: ConnectedNetwork<PubKey>,
28 P: SequencerPersistence,
29> {
30 #[deref]
31 #[allow(clippy::type_complexity)]
32 inner: RequestResponse<
34 Sender,
35 Receiver<Bytes>,
36 Request,
37 RecipientSource<I>,
38 DataSource<I, N, P>,
39 PubKey,
40 >,
41
42 config: RequestResponseConfig,
45
46 public_key: PubKey,
48 private_key: BLSPrivKey,
50}
51
52impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
53 RequestResponseProtocol<I, N, P>
54{
55 pub fn new(
57 config: RequestResponseConfig,
59 sender: Sender,
61 receiver: Receiver<Bytes>,
63 recipient_source: RecipientSource<I>,
66 data_source: DataSource<I, N, P>,
69 public_key: PubKey,
71 private_key: BLSPrivKey,
73 ) -> Self {
74 Self {
75 inner: RequestResponse::new(
76 config.clone(),
77 sender,
78 receiver,
79 recipient_source,
80 data_source,
81 ),
82 config,
83 public_key,
84 private_key,
85 }
86 }
87}
88
89impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
90 RequestResponseProtocol<I, N, P>
91{
92 pub async fn request_indefinitely<F, Fut, O>(
93 &self,
94 request: Request,
96 request_type: RequestType,
98 response_validation_fn: F,
100 ) -> std::result::Result<O, RequestError>
101 where
102 F: Fn(&Request, Response) -> Fut + Send + Sync + 'static + Clone,
103 Fut: Future<Output = anyhow::Result<O>> + Send + Sync + 'static,
104 O: Send + Sync + 'static + Clone,
105 {
106 self.inner
108 .request_indefinitely(
109 &self.public_key,
110 &self.private_key,
111 request_type,
112 self.config.incoming_request_ttl,
113 request,
114 response_validation_fn,
115 )
116 .await
117 }
118}