1use std::{num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use committable::Commitment;
5use hotshot_types::{
6 data::{EpochNumber, Leaf2, ViewNumber},
7 message::Proposal as SignedProposal,
8 simple_vote::TimeoutVote2,
9 traits::{leaf_fetcher_network::LeafFetcherNetwork, node_implementation::NodeType},
10 utils::StateAndDelta,
11};
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{
15 consensus::PreCutoverSeed, coordinator::error::CoordinatorError, message::Proposal,
16 state::UpdateLeaf,
17};
18
19#[derive(Clone)]
20pub struct ClientApi<T: NodeType> {
21 tx: mpsc::Sender<ClientRequest<T>>,
22}
23
24impl<T: NodeType> ClientApi<T> {
25 pub async fn current_view(&self) -> Result<ViewNumber, QueryError> {
26 let (tx, rx) = oneshot::channel();
27 self.call(ClientRequest::CurrentView(tx), rx).await
28 }
29
30 pub async fn current_epoch(&self) -> Result<Option<EpochNumber>, QueryError> {
31 let (tx, rx) = oneshot::channel();
32 self.call(ClientRequest::CurrentEpoch(tx), rx).await
33 }
34
35 pub async fn decided_leaf(&self) -> Result<Leaf2<T>, QueryError> {
36 let (tx, rx) = oneshot::channel();
37 self.call(ClientRequest::DecidedLeaf(tx), rx).await
38 }
39
40 pub async fn decided_state(&self) -> Result<Option<Arc<T::ValidatedState>>, QueryError> {
41 let (tx, rx) = oneshot::channel();
42 self.call(ClientRequest::DecidedState(tx), rx).await
43 }
44
45 pub async fn undecided_leaves(&self) -> Result<Vec<Leaf2<T>>, QueryError> {
46 let (tx, rx) = oneshot::channel();
47 self.call(ClientRequest::UndecidedLeaves(tx), rx).await
48 }
49
50 pub async fn state(
51 &self,
52 view: ViewNumber,
53 ) -> Result<Option<Arc<T::ValidatedState>>, QueryError> {
54 let (tx, rx) = oneshot::channel();
55 self.call(ClientRequest::GetState { view, respond: tx }, rx)
56 .await
57 }
58
59 pub async fn state_and_delta(&self, view: ViewNumber) -> Result<StateAndDelta<T>, QueryError> {
60 let (tx, rx) = oneshot::channel();
61 self.call(ClientRequest::GetStateAndDelta { view, respond: tx }, rx)
62 .await
63 }
64
65 pub async fn update_leaf(&self, update: UpdateLeaf<T>) -> Result<(), QueryError> {
66 let (tx, rx) = oneshot::channel();
67 self.call(
68 ClientRequest::UpdateLeaf {
69 update,
70 respond: tx,
71 },
72 rx,
73 )
74 .await
75 }
76
77 pub async fn submit_transaction(&self, tx: T::Transaction) -> Result<(), QueryError> {
78 let (respond, rx) = oneshot::channel();
79 self.call(ClientRequest::SubmitTransaction { tx, respond }, rx)
80 .await
81 }
82
83 pub async fn request_proposal(
84 &self,
85 view: ViewNumber,
86 leaf_commitment: Commitment<Leaf2<T>>,
87 ) -> Result<SignedProposal<T, Proposal<T>>, QueryError> {
88 let (respond, rx) = oneshot::channel();
89 self.call(
90 ClientRequest::RequestProposal {
91 view,
92 leaf_commitment,
93 respond,
94 },
95 rx,
96 )
97 .await?
98 }
99
100 pub async fn send_external_message(
101 &self,
102 view: ViewNumber,
103 payload: Vec<u8>,
104 recipient: T::SignatureKey,
105 ) -> Result<(), QueryError> {
106 let (respond, rx) = oneshot::channel();
107 self.call(
108 ClientRequest::SendExternalMessage {
109 view,
110 payload,
111 recipient,
112 respond,
113 },
114 rx,
115 )
116 .await?
117 }
118
119 pub async fn submit_timeout_vote(&self, vote: TimeoutVote2<T>) -> Result<(), QueryError> {
121 let (respond, rx) = oneshot::channel();
122 self.call(ClientRequest::SubmitTimeoutVote { vote, respond }, rx)
123 .await
124 }
125
126 pub async fn bump_network_epoch(&self, epoch: EpochNumber) -> Result<(), QueryError> {
128 let (respond, rx) = oneshot::channel();
129 self.call(ClientRequest::BumpNetworkEpoch { epoch, respond }, rx)
130 .await
131 }
132
133 pub async fn seed_pre_cutover(&self, seed: PreCutoverSeed<T>) -> Result<(), QueryError> {
136 let (respond, rx) = oneshot::channel();
137 self.call(ClientRequest::SeedPreCutover { seed, respond }, rx)
138 .await
139 }
140
141 async fn call<A>(
142 &self,
143 request: ClientRequest<T>,
144 rx: oneshot::Receiver<A>,
145 ) -> Result<A, QueryError> {
146 self.tx
147 .send(request)
148 .await
149 .map_err(|_| QueryError::ChannelClosed)?;
150 rx.await.map_err(|_| QueryError::ResponseDropped)
151 }
152}
153
154pub struct CoordinatorClient<T: NodeType> {
159 rx: mpsc::Receiver<ClientRequest<T>>,
160 api: ClientApi<T>,
161}
162
163impl<T: NodeType> Default for CoordinatorClient<T> {
164 fn default() -> Self {
165 Self::new(NonZeroUsize::new(256).expect("256 > 0"))
166 }
167}
168
169impl<T: NodeType> CoordinatorClient<T> {
170 pub fn new(capacity: NonZeroUsize) -> Self {
171 let (tx, rx) = mpsc::channel(capacity.get());
172 Self {
173 rx,
174 api: ClientApi { tx },
175 }
176 }
177
178 pub fn handle(&self) -> &ClientApi<T> {
179 &self.api
180 }
181
182 pub(crate) async fn next_request(&mut self) -> Option<ClientRequest<T>> {
183 self.rx.recv().await
184 }
185}
186
187#[allow(clippy::large_enum_variant)]
188pub(crate) enum ClientRequest<T: NodeType> {
189 CurrentView(oneshot::Sender<ViewNumber>),
190 CurrentEpoch(oneshot::Sender<Option<EpochNumber>>),
191 DecidedLeaf(oneshot::Sender<Leaf2<T>>),
192 DecidedState(oneshot::Sender<Option<Arc<T::ValidatedState>>>),
193 UndecidedLeaves(oneshot::Sender<Vec<Leaf2<T>>>),
194 GetState {
195 view: ViewNumber,
196 respond: oneshot::Sender<Option<Arc<T::ValidatedState>>>,
197 },
198 GetStateAndDelta {
199 view: ViewNumber,
200 respond: oneshot::Sender<StateAndDelta<T>>,
201 },
202 UpdateLeaf {
203 update: UpdateLeaf<T>,
204 respond: oneshot::Sender<()>,
205 },
206 SubmitTransaction {
207 tx: T::Transaction,
208 respond: oneshot::Sender<()>,
209 },
210 RequestProposal {
211 view: ViewNumber,
212 leaf_commitment: Commitment<Leaf2<T>>,
213 respond: oneshot::Sender<Result<SignedProposal<T, Proposal<T>>, QueryError>>,
214 },
215 SendExternalMessage {
216 view: ViewNumber,
217 payload: Vec<u8>,
218 recipient: T::SignatureKey,
219 respond: oneshot::Sender<Result<(), QueryError>>,
220 },
221 SeedPreCutover {
222 seed: PreCutoverSeed<T>,
223 respond: oneshot::Sender<()>,
224 },
225 SubmitTimeoutVote {
226 vote: TimeoutVote2<T>,
227 respond: oneshot::Sender<()>,
228 },
229 BumpNetworkEpoch {
230 epoch: EpochNumber,
231 respond: oneshot::Sender<()>,
232 },
233}
234
235#[derive(Debug, thiserror::Error)]
236#[non_exhaustive]
237pub enum QueryError {
238 #[error("failed to send request. coordinator channel closed")]
239 ChannelClosed,
240
241 #[error("coordinator dropped the response")]
242 ResponseDropped,
243
244 #[error("coordinator error: {0}")]
245 Coordinator(#[from] CoordinatorError),
246}
247
248pub struct ClientLeafFetcherNetwork<T: NodeType> {
255 client: ClientApi<T>,
256}
257
258impl<T: NodeType> ClientLeafFetcherNetwork<T> {
259 pub fn new(client: ClientApi<T>) -> Self {
260 Self { client }
261 }
262}
263
264#[async_trait]
265impl<T: NodeType> LeafFetcherNetwork<T> for ClientLeafFetcherNetwork<T> {
266 async fn send_leaf_request(
267 &self,
268 view: ViewNumber,
269 payload: Vec<u8>,
270 recipient: T::SignatureKey,
271 ) -> anyhow::Result<()> {
272 self.client
273 .send_external_message(view, payload, recipient)
274 .await?;
275 Ok(())
276 }
277
278 async fn send_leaf_response(
279 &self,
280 view: ViewNumber,
281 payload: Vec<u8>,
282 recipient: T::SignatureKey,
283 ) -> anyhow::Result<()> {
284 self.client
285 .send_external_message(view, payload, recipient)
286 .await?;
287 Ok(())
288 }
289}