espresso_node/request_response/
mod.rs

1use std::future::Future;
2
3use data_source::DataSource;
4use derive_more::derive::Deref;
5use espresso_types::{PubKey, SeqTypes, traits::SequencerPersistence};
6use hotshot::{traits::NodeImplementation, types::BLSPrivKey};
7use hotshot_types::traits::network::ConnectedNetwork;
8use network::Sender;
9use recipient_source::RecipientSource;
10use request::{Request, Response};
11use request_response::{
12    RequestError, RequestResponse, RequestResponseConfig, RequestType, network::Bytes,
13};
14use tokio::sync::mpsc::Receiver;
15
16pub mod catchup;
17pub mod data_source;
18pub mod network;
19pub mod recipient_source;
20pub mod request;
21
22/// A concrete type wrapper around `RequestResponse`. We need this so that we can implement
23/// local traits like `StateCatchup`. It also helps with readability.
24#[derive(Clone, Deref)]
25pub struct RequestResponseProtocol<
26    I: NodeImplementation<SeqTypes>,
27    N: ConnectedNetwork<PubKey>,
28    P: SequencerPersistence,
29> {
30    #[deref]
31    #[allow(clippy::type_complexity)]
32    /// The actual inner request response protocol
33    inner: RequestResponse<
34        Sender,
35        Receiver<Bytes>,
36        Request,
37        RecipientSource<I>,
38        DataSource<I, N, P>,
39        PubKey,
40    >,
41
42    /// The configuration we used for the above inner protocol. This is nice to have for
43    /// estimating when we should make another request
44    config: RequestResponseConfig,
45
46    /// The public key of this node
47    public_key: PubKey,
48    /// The private key of this node
49    private_key: BLSPrivKey,
50}
51
52impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
53    RequestResponseProtocol<I, N, P>
54{
55    /// Create a new RequestResponseProtocol from the inner
56    pub fn new(
57        // The configuration for the protocol
58        config: RequestResponseConfig,
59        // The network sender that [`RequestResponseProtocol`] will use to send messages
60        sender: Sender,
61        // The network receiver that [`RequestResponseProtocol`] will use to receive messages
62        receiver: Receiver<Bytes>,
63        // The recipient source that [`RequestResponseProtocol`] will use to get the recipients
64        // that a specific message should expect responses from
65        recipient_source: RecipientSource<I>,
66        // The [response] data source that [`RequestResponseProtocol`] will use to derive the
67        // response data for a specific request
68        data_source: DataSource<I, N, P>,
69        // The public key of this node
70        public_key: PubKey,
71        // The private key of this node
72        private_key: BLSPrivKey,
73    ) -> Self {
74        Self {
75            inner: RequestResponse::new(
76                config.clone(),
77                sender,
78                receiver,
79                recipient_source,
80                data_source,
81            ),
82            config,
83            public_key,
84            private_key,
85        }
86    }
87}
88
89impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
90    RequestResponseProtocol<I, N, P>
91{
92    pub async fn request_indefinitely<F, Fut, O>(
93        &self,
94        // The request to make
95        request: Request,
96        // The type of request
97        request_type: RequestType,
98        // The response validation function
99        response_validation_fn: F,
100    ) -> std::result::Result<O, RequestError>
101    where
102        F: Fn(&Request, Response) -> Fut + Send + Sync + 'static + Clone,
103        Fut: Future<Output = anyhow::Result<O>> + Send + Sync + 'static,
104        O: Send + Sync + 'static + Clone,
105    {
106        // Request from the inner protocol
107        self.inner
108            .request_indefinitely(
109                &self.public_key,
110                &self.private_key,
111                request_type,
112                self.config.incoming_request_ttl,
113                request,
114                response_validation_fn,
115            )
116            .await
117    }
118}