Skip to main content

hotshot_new_protocol/
client.rs

1use std::{num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use committable::Commitment;
5use hotshot_types::{
6    data::{EpochNumber, Leaf2, ViewNumber},
7    message::Proposal as SignedProposal,
8    simple_vote::TimeoutVote2,
9    traits::{leaf_fetcher_network::LeafFetcherNetwork, node_implementation::NodeType},
10    utils::StateAndDelta,
11};
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{
15    consensus::PreCutoverSeed, coordinator::error::CoordinatorError, message::Proposal,
16    state::UpdateLeaf,
17};
18
19#[derive(Clone)]
20pub struct ClientApi<T: NodeType> {
21    tx: mpsc::Sender<ClientRequest<T>>,
22}
23
24impl<T: NodeType> ClientApi<T> {
25    pub async fn current_view(&self) -> Result<ViewNumber, QueryError> {
26        let (tx, rx) = oneshot::channel();
27        self.call(ClientRequest::CurrentView(tx), rx).await
28    }
29
30    pub async fn current_epoch(&self) -> Result<Option<EpochNumber>, QueryError> {
31        let (tx, rx) = oneshot::channel();
32        self.call(ClientRequest::CurrentEpoch(tx), rx).await
33    }
34
35    pub async fn decided_leaf(&self) -> Result<Leaf2<T>, QueryError> {
36        let (tx, rx) = oneshot::channel();
37        self.call(ClientRequest::DecidedLeaf(tx), rx).await
38    }
39
40    pub async fn decided_state(&self) -> Result<Option<Arc<T::ValidatedState>>, QueryError> {
41        let (tx, rx) = oneshot::channel();
42        self.call(ClientRequest::DecidedState(tx), rx).await
43    }
44
45    pub async fn undecided_leaves(&self) -> Result<Vec<Leaf2<T>>, QueryError> {
46        let (tx, rx) = oneshot::channel();
47        self.call(ClientRequest::UndecidedLeaves(tx), rx).await
48    }
49
50    pub async fn state(
51        &self,
52        view: ViewNumber,
53    ) -> Result<Option<Arc<T::ValidatedState>>, QueryError> {
54        let (tx, rx) = oneshot::channel();
55        self.call(ClientRequest::GetState { view, respond: tx }, rx)
56            .await
57    }
58
59    pub async fn state_and_delta(&self, view: ViewNumber) -> Result<StateAndDelta<T>, QueryError> {
60        let (tx, rx) = oneshot::channel();
61        self.call(ClientRequest::GetStateAndDelta { view, respond: tx }, rx)
62            .await
63    }
64
65    pub async fn update_leaf(&self, update: UpdateLeaf<T>) -> Result<(), QueryError> {
66        let (tx, rx) = oneshot::channel();
67        self.call(
68            ClientRequest::UpdateLeaf {
69                update,
70                respond: tx,
71            },
72            rx,
73        )
74        .await
75    }
76
77    pub async fn submit_transaction(&self, tx: T::Transaction) -> Result<(), QueryError> {
78        let (respond, rx) = oneshot::channel();
79        self.call(ClientRequest::SubmitTransaction { tx, respond }, rx)
80            .await
81    }
82
83    pub async fn request_proposal(
84        &self,
85        view: ViewNumber,
86        leaf_commitment: Commitment<Leaf2<T>>,
87    ) -> Result<SignedProposal<T, Proposal<T>>, QueryError> {
88        let (respond, rx) = oneshot::channel();
89        self.call(
90            ClientRequest::RequestProposal {
91                view,
92                leaf_commitment,
93                respond,
94            },
95            rx,
96        )
97        .await?
98    }
99
100    pub async fn send_external_message(
101        &self,
102        view: ViewNumber,
103        payload: Vec<u8>,
104        recipient: T::SignatureKey,
105    ) -> Result<(), QueryError> {
106        let (respond, rx) = oneshot::channel();
107        self.call(
108            ClientRequest::SendExternalMessage {
109                view,
110                payload,
111                recipient,
112                respond,
113            },
114            rx,
115        )
116        .await?
117    }
118
119    /// Forward a legacy `TimeoutVote2` into the new-protocol timeout collectors.
120    pub async fn submit_timeout_vote(&self, vote: TimeoutVote2<T>) -> Result<(), QueryError> {
121        let (respond, rx) = oneshot::channel();
122        self.call(ClientRequest::SubmitTimeoutVote { vote, respond }, rx)
123            .await
124    }
125
126    /// Refresh the coordinator network's peer set for `epoch`.
127    pub async fn bump_network_epoch(&self, epoch: EpochNumber) -> Result<(), QueryError> {
128        let (respond, rx) = oneshot::channel();
129        self.call(ClientRequest::BumpNetworkEpoch { epoch, respond }, rx)
130            .await
131    }
132
133    /// Bridge legacy state into the coordinator at the cutover.
134    /// Idempotent at the consensus layer.
135    pub async fn seed_pre_cutover(&self, seed: PreCutoverSeed<T>) -> Result<(), QueryError> {
136        let (respond, rx) = oneshot::channel();
137        self.call(ClientRequest::SeedPreCutover { seed, respond }, rx)
138            .await
139    }
140
141    async fn call<A>(
142        &self,
143        request: ClientRequest<T>,
144        rx: oneshot::Receiver<A>,
145    ) -> Result<A, QueryError> {
146        self.tx
147            .send(request)
148            .await
149            .map_err(|_| QueryError::ChannelClosed)?;
150        rx.await.map_err(|_| QueryError::ResponseDropped)
151    }
152}
153
154/// The coordinator client owns the receive end of the request channel.
155///
156/// The coordinator holds this and calls [`next_request`](CoordinatorClient::next_request)
157/// to process incoming requests.
158pub struct CoordinatorClient<T: NodeType> {
159    rx: mpsc::Receiver<ClientRequest<T>>,
160    api: ClientApi<T>,
161}
162
163impl<T: NodeType> Default for CoordinatorClient<T> {
164    fn default() -> Self {
165        Self::new(NonZeroUsize::new(256).expect("256 > 0"))
166    }
167}
168
169impl<T: NodeType> CoordinatorClient<T> {
170    pub fn new(capacity: NonZeroUsize) -> Self {
171        let (tx, rx) = mpsc::channel(capacity.get());
172        Self {
173            rx,
174            api: ClientApi { tx },
175        }
176    }
177
178    pub fn handle(&self) -> &ClientApi<T> {
179        &self.api
180    }
181
182    pub(crate) async fn next_request(&mut self) -> Option<ClientRequest<T>> {
183        self.rx.recv().await
184    }
185}
186
187#[allow(clippy::large_enum_variant)]
188pub(crate) enum ClientRequest<T: NodeType> {
189    CurrentView(oneshot::Sender<ViewNumber>),
190    CurrentEpoch(oneshot::Sender<Option<EpochNumber>>),
191    DecidedLeaf(oneshot::Sender<Leaf2<T>>),
192    DecidedState(oneshot::Sender<Option<Arc<T::ValidatedState>>>),
193    UndecidedLeaves(oneshot::Sender<Vec<Leaf2<T>>>),
194    GetState {
195        view: ViewNumber,
196        respond: oneshot::Sender<Option<Arc<T::ValidatedState>>>,
197    },
198    GetStateAndDelta {
199        view: ViewNumber,
200        respond: oneshot::Sender<StateAndDelta<T>>,
201    },
202    UpdateLeaf {
203        update: UpdateLeaf<T>,
204        respond: oneshot::Sender<()>,
205    },
206    SubmitTransaction {
207        tx: T::Transaction,
208        respond: oneshot::Sender<()>,
209    },
210    RequestProposal {
211        view: ViewNumber,
212        leaf_commitment: Commitment<Leaf2<T>>,
213        respond: oneshot::Sender<Result<SignedProposal<T, Proposal<T>>, QueryError>>,
214    },
215    SendExternalMessage {
216        view: ViewNumber,
217        payload: Vec<u8>,
218        recipient: T::SignatureKey,
219        respond: oneshot::Sender<Result<(), QueryError>>,
220    },
221    SeedPreCutover {
222        seed: PreCutoverSeed<T>,
223        respond: oneshot::Sender<()>,
224    },
225    SubmitTimeoutVote {
226        vote: TimeoutVote2<T>,
227        respond: oneshot::Sender<()>,
228    },
229    BumpNetworkEpoch {
230        epoch: EpochNumber,
231        respond: oneshot::Sender<()>,
232    },
233}
234
235#[derive(Debug, thiserror::Error)]
236#[non_exhaustive]
237pub enum QueryError {
238    #[error("failed to send request. coordinator channel closed")]
239    ChannelClosed,
240
241    #[error("coordinator dropped the response")]
242    ResponseDropped,
243
244    #[error("coordinator error: {0}")]
245    Coordinator(#[from] CoordinatorError),
246}
247
248/// `LeafFetcherNetwork` impl that routes catchup direct-messages through
249/// the `Coordinator`'s single owned network via [`ClientApi`].
250///
251/// The membership layer gets a clone of this so it does not need its own
252/// network handle — the `Coordinator` is the only owner of the underlying
253/// `ConnectedNetwork`.
254pub struct ClientLeafFetcherNetwork<T: NodeType> {
255    client: ClientApi<T>,
256}
257
258impl<T: NodeType> ClientLeafFetcherNetwork<T> {
259    pub fn new(client: ClientApi<T>) -> Self {
260        Self { client }
261    }
262}
263
264#[async_trait]
265impl<T: NodeType> LeafFetcherNetwork<T> for ClientLeafFetcherNetwork<T> {
266    async fn send_leaf_request(
267        &self,
268        view: ViewNumber,
269        payload: Vec<u8>,
270        recipient: T::SignatureKey,
271    ) -> anyhow::Result<()> {
272        self.client
273            .send_external_message(view, payload, recipient)
274            .await?;
275        Ok(())
276    }
277
278    async fn send_leaf_response(
279        &self,
280        view: ViewNumber,
281        payload: Vec<u8>,
282        recipient: T::SignatureKey,
283    ) -> anyhow::Result<()> {
284        self.client
285            .send_external_message(view, payload, recipient)
286            .await?;
287        Ok(())
288    }
289}