1pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11use futures::{
13 SinkExt,
14 channel::{mpsc, oneshot::Sender},
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19 Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent, store::RecordStore,
20};
21use libp2p::kad::{
22 BootstrapOk, GetClosestPeersOk, GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult,
23 QueryId, QueryResult, Record, store::MemoryStore,
24};
25use libp2p_identity::PeerId;
26use store::{
27 persistent::{DhtPersistentStorage, PersistentStore},
28 validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33pub mod record;
35
36pub mod store;
38
39lazy_static! {
40 static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
46
47#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55 pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57 in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59 outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61 in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63 pub bootstrap_state: Bootstrap,
65 pub peer_id: PeerId,
67 pub replication_factor: NonZeroUsize,
69 retry_tx: Option<UnboundedSender<ClientRequest>>,
71 bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74 phantom: PhantomData<(K, D)>,
76}
77
78#[derive(Debug, Clone)]
80pub struct Bootstrap {
81 pub state: State,
83 pub backoff: ExponentialBackoff,
85}
86
87#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90 NotStarted,
92 Started,
94}
95
96#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99 IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104 pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106 self.retry_tx = Some(tx);
107 }
108 pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110 self.bootstrap_tx = Some(tx);
111 }
112 #[must_use]
114 pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115 Self {
121 peer_id: pid,
122 in_progress_record_queries: HashMap::default(),
123 in_progress_put_record_queries: HashMap::default(),
124 outstanding_dht_query_keys: HashMap::default(),
125 bootstrap_state: Bootstrap {
126 state: State::NotStarted,
127 backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128 },
129 in_progress_get_closest_peers: HashMap::default(),
130 replication_factor,
131 retry_tx: None,
132 bootstrap_tx: None,
133 phantom: PhantomData,
134 }
135 }
136
137 pub fn print_routing_table(
139 &mut self,
140 kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141 ) {
142 let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143 let v = kadem.kbuckets().collect::<Vec<_>>();
144 for i in v {
145 for j in i.iter() {
146 let s = format!(
147 "node: key: {:?}, val {:?}, status: {:?}",
148 j.node.key, j.node.value, j.status
149 );
150 err.push_str(&s);
151 }
152 }
153 error!("{:?}", err);
154 }
155
156 #[must_use]
158 pub fn replication_factor(&self) -> NonZeroUsize {
159 self.replication_factor
160 }
161 pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165 self.in_progress_put_record_queries.insert(id, query);
166 }
167
168 pub fn get_record(
170 &mut self,
171 key: Vec<u8>,
172 chans: Vec<Sender<Vec<u8>>>,
173 backoff: ExponentialBackoff,
174 retry_count: u8,
175 kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176 ) {
177 if retry_count == 0 {
179 return;
180 }
181
182 if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184 for chan in chans {
186 if chan.send(entry.value.clone()).is_err() {
187 debug!(
188 "Get DHT: channel closed before get record request result could be sent"
189 );
190 }
191 }
192 } else {
193 if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
195 let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
198 warn!("Get DHT: outstanding query not found");
199 return;
200 };
201
202 query.notify.extend(chans);
204 } else {
205 let qid = kad.get_record(key.clone().into());
207 let query = KadGetQuery {
208 backoff,
209 progress: DHTProgress::InProgress(qid),
210 notify: chans,
211 key: key.clone(),
212 retry_count: retry_count - 1,
213 records: Vec::new(),
214 };
215
216 self.outstanding_dht_query_keys.insert(key, qid);
218 self.in_progress_record_queries.insert(qid, query);
219 }
220 }
221 }
222
223 fn retry_get(&self, mut query: KadGetQuery) {
225 let Some(tx) = self.retry_tx.clone() else {
226 return;
227 };
228 let req = ClientRequest::GetDHT {
229 key: query.key,
230 notify: query.notify,
231 retry_count: query.retry_count,
232 };
233 let backoff = query.backoff.next_timeout(false);
234 spawn(async move {
235 sleep(backoff).await;
236 let _ = tx.send(req);
237 });
238 }
239
240 fn retry_put(&self, mut query: KadPutQuery) {
242 let Some(tx) = self.retry_tx.clone() else {
243 return;
244 };
245 let req = ClientRequest::PutDHT {
246 key: query.key,
247 value: query.value,
248 notify: query.notify,
249 };
250 spawn(async move {
251 sleep(query.backoff.next_timeout(false)).await;
252 let _ = tx.send(req);
253 });
254 }
255
256 fn handle_get_query(
258 &mut self,
259 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
260 record_results: GetRecordResult,
261 id: QueryId,
262 mut last: bool,
263 ) {
264 let found = match self.in_progress_record_queries.get_mut(&id) {
265 Some(query) => match record_results {
266 Ok(results) => match results {
267 GetRecordOk::FoundRecord(record) => {
268 if record.record.expires.is_some() {
270 query.records.push(record.record);
271 true
272 } else {
273 false
274 }
275 },
276 GetRecordOk::FinishedWithNoAdditionalRecord {
277 cache_candidates: _,
278 } => {
279 tracing::debug!("GetRecord Finished with No Additional Record");
280 last = true;
281 false
282 },
283 },
284 Err(err) => {
285 LogEvent::DhtKadQueryError.record();
286 debug!("Error in Kademlia query: {err:?}");
287 false
288 },
289 },
290 None => {
291 return;
294 },
295 };
296
297 if (found || last)
299 && let Some(KadGetQuery {
300 backoff,
301 progress,
302 notify,
303 key,
304 retry_count,
305 records,
306 }) = self.in_progress_record_queries.remove(&id)
307 {
308 self.outstanding_dht_query_keys.remove(&key);
310
311 let notify = notify
313 .into_iter()
314 .filter(|n| !n.is_canceled())
315 .collect::<Vec<_>>();
316
317 if notify.is_empty() {
319 return;
320 }
321
322 if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
324 if store.put(record.clone()).is_ok() {
326 for n in notify {
328 if n.send(record.value.clone()).is_err() {
329 debug!(
330 "Get DHT: channel closed before get record request result could \
331 be sent"
332 );
333 }
334 }
335 } else {
336 error!("Failed to store record in local store");
337 }
338 }
339 else if retry_count > 0 {
341 let new_retry_count = retry_count - 1;
342 debug!(
343 "Get DHT: Internal disagreement for get dht request {progress:?}! requerying \
344 with more nodes. {new_retry_count:?} retries left"
345 );
346 self.retry_get(KadGetQuery {
347 backoff,
348 progress: DHTProgress::NotStarted,
349 notify,
350 key,
351 retry_count: new_retry_count,
352 records: Vec::new(),
353 });
354 } else {
355 LogEvent::DhtDisagreementGivenUp.record();
356 debug!(
357 "Get DHT: Internal disagreement for get dht request {progress:?}! Giving up \
358 because out of retries. "
359 );
360 }
361 }
362 }
363
364 fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
366 if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
367 if query.notify.is_canceled() {
369 return;
370 }
371
372 match record_results {
373 Ok(_) => {
374 if query.notify.send(()).is_err() {
375 warn!(
376 "Put DHT: client channel closed before put record request could be \
377 sent"
378 );
379 }
380 },
381 Err(e) => {
382 query.progress = DHTProgress::NotStarted;
383 query.backoff.start_next(false);
384
385 warn!(
386 "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
387 e, self.peer_id
388 );
389 self.retry_put(query);
391 },
392 }
393 } else {
394 warn!("Put DHT: completed DHT query that is no longer tracked.");
395 }
396 }
397
398 fn finish_bootstrap(&mut self) {
400 if let Some(mut tx) = self.bootstrap_tx.clone() {
401 spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
402 }
403 }
404
405 #[allow(clippy::too_many_lines)]
406 pub fn dht_handle_event(
408 &mut self,
409 event: KademliaEvent,
410 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
411 ) -> Option<NetworkEvent> {
412 match event {
413 KademliaEvent::OutboundQueryProgressed {
414 result: QueryResult::PutRecord(record_results),
415 id,
416 step: ProgressStep { last, .. },
417 ..
418 } => {
419 if last {
420 self.handle_put_query(record_results, id);
421 }
422 },
423 KademliaEvent::OutboundQueryProgressed {
424 result: QueryResult::GetClosestPeers(r),
425 id: query_id,
426 stats: _,
427 step: ProgressStep { last: true, .. },
428 ..
429 } => match r {
430 Ok(GetClosestPeersOk { key, peers: _ }) => {
431 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id)
432 && chan.send(()).is_err()
433 {
434 warn!("DHT: finished query but client was no longer interested");
435 };
436 debug!("Successfully got closest peers for key {key:?}");
437 },
438 Err(e) => {
439 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
440 let _: Result<_, _> = chan.send(());
441 };
442 LogEvent::DhtClosestPeersFailure.record();
443 debug!("Failed to get closest peers: {e:?}");
444 },
445 },
446 KademliaEvent::OutboundQueryProgressed {
447 result: QueryResult::GetRecord(record_results),
448 id,
449 step: ProgressStep { last, .. },
450 ..
451 } => {
452 self.handle_get_query(store, record_results, id, last);
453 },
454 KademliaEvent::OutboundQueryProgressed {
455 result:
456 QueryResult::Bootstrap(Ok(BootstrapOk {
457 peer: _,
458 num_remaining,
459 })),
460 step: ProgressStep { last: true, .. },
461 ..
462 } => {
463 if num_remaining == 0 {
464 self.finish_bootstrap();
465 } else {
466 debug!("Bootstrap in progress, {num_remaining} nodes remaining");
467 }
468 return Some(NetworkEvent::IsBootstrapped);
469 },
470 KademliaEvent::OutboundQueryProgressed {
471 result: QueryResult::Bootstrap(Err(e)),
472 ..
473 } => {
474 let BootstrapError::Timeout { num_remaining, .. } = e;
475 if num_remaining.is_none() {
476 error!("Failed to bootstrap: {e:?}");
477 }
478 self.finish_bootstrap();
479 },
480 KademliaEvent::RoutablePeer { peer, address: _ } => {
481 debug!("Found routable peer {peer:?}");
482 },
483 KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
484 debug!("Found pending routable peer {peer:?}");
485 },
486 KademliaEvent::UnroutablePeer { peer } => {
487 debug!("Found unroutable peer {peer:?}");
488 },
489 KademliaEvent::RoutingUpdated {
490 peer: _,
491 is_new_peer: _,
492 addresses: _,
493 bucket_range: _,
494 old_peer: _,
495 } => {
496 debug!("Routing table updated");
497 },
498 e @ KademliaEvent::OutboundQueryProgressed { .. } => {
499 debug!("Not handling dht event {e:?}");
500 },
501 e => {
502 debug!("New unhandled swarm event: {e:?}");
503 },
504 }
505 None
506 }
507}
508
509#[derive(Debug)]
511pub(crate) struct KadGetQuery {
512 pub(crate) backoff: ExponentialBackoff,
514 pub(crate) progress: DHTProgress,
516 pub(crate) notify: Vec<Sender<Vec<u8>>>,
518 pub(crate) key: Vec<u8>,
520 pub(crate) retry_count: u8,
522 pub(crate) records: Vec<Record>,
524}
525
526#[derive(Debug)]
528pub struct KadPutQuery {
529 pub(crate) backoff: ExponentialBackoff,
531 pub(crate) progress: DHTProgress,
533 pub(crate) notify: Sender<()>,
535 pub(crate) key: Vec<u8>,
537 pub(crate) value: Vec<u8>,
539}
540
541#[derive(Debug, Clone, Eq, Hash, PartialEq)]
543pub enum DHTProgress {
544 InProgress(QueryId),
546 NotStarted,
548}