Skip to main content

hotshot_libp2p_networking/network/behaviours/dht/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// Task for doing bootstraps at a regular interval
8pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11/// a local caching layer for the DHT key value pairs
12use futures::{
13    SinkExt,
14    channel::{mpsc, oneshot::Sender},
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19    Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent, store::RecordStore,
20};
21use libp2p::kad::{
22    BootstrapOk, GetClosestPeersOk, GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult,
23    QueryId, QueryResult, Record, /* handler::KademliaHandlerIn, */ store::MemoryStore,
24};
25use libp2p_identity::PeerId;
26use store::{
27    persistent::{DhtPersistentStorage, PersistentStore},
28    validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33/// Additional DHT record functionality
34pub mod record;
35
36/// Additional DHT store functionality
37pub mod store;
38
39lazy_static! {
40    /// the maximum number of nodes to query in the DHT at any one time
41    static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
46
47/// Behaviour wrapping libp2p's kademlia
48/// included:
49/// - publishing API
50/// - Request API
51/// - bootstrapping into the network
52/// - peer discovery
53#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55    /// in progress queries for nearby peers
56    pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57    /// List of in-progress get requests
58    in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59    /// The list of in-progress get requests by key
60    outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61    /// List of in-progress put requests
62    in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63    /// State of bootstrapping
64    pub bootstrap_state: Bootstrap,
65    /// the peer id (useful only for debugging right now)
66    pub peer_id: PeerId,
67    /// replication factor
68    pub replication_factor: NonZeroUsize,
69    /// Sender to retry requests.
70    retry_tx: Option<UnboundedSender<ClientRequest>>,
71    /// Sender to the bootstrap task
72    bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74    /// Phantom type for the key and persistent storage
75    phantom: PhantomData<(K, D)>,
76}
77
78/// State of bootstrapping
79#[derive(Debug, Clone)]
80pub struct Bootstrap {
81    /// State of bootstrap
82    pub state: State,
83    /// Retry timeout
84    pub backoff: ExponentialBackoff,
85}
86
87/// State used for random walk and bootstrapping
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90    /// Not in progress
91    NotStarted,
92    /// In progress
93    Started,
94}
95
96/// DHT event enum
97#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99    /// Only event tracked currently is when we successfully bootstrap into the network
100    IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104    /// Give the handler a way to retry requests.
105    pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106        self.retry_tx = Some(tx);
107    }
108    /// Sets a sender to bootstrap task
109    pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110        self.bootstrap_tx = Some(tx);
111    }
112    /// Create a new DHT behaviour
113    #[must_use]
114    pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115        // needed because otherwise we stay in client mode when testing locally
116        // and don't publish keys stuff
117        // e.g. dht just doesn't work. We'd need to add mdns and that doesn't seem worth it since
118        // we won't have a local network
119        // <https://github.com/libp2p/rust-libp2p/issues/4194>
120        Self {
121            peer_id: pid,
122            in_progress_record_queries: HashMap::default(),
123            in_progress_put_record_queries: HashMap::default(),
124            outstanding_dht_query_keys: HashMap::default(),
125            bootstrap_state: Bootstrap {
126                state: State::NotStarted,
127                backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128            },
129            in_progress_get_closest_peers: HashMap::default(),
130            replication_factor,
131            retry_tx: None,
132            bootstrap_tx: None,
133            phantom: PhantomData,
134        }
135    }
136
137    /// print out the routing table to stderr
138    pub fn print_routing_table(
139        &mut self,
140        kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141    ) {
142        let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143        let v = kadem.kbuckets().collect::<Vec<_>>();
144        for i in v {
145            for j in i.iter() {
146                let s = format!(
147                    "node: key: {:?}, val {:?}, status: {:?}",
148                    j.node.key, j.node.value, j.status
149                );
150                err.push_str(&s);
151            }
152        }
153        error!("{:?}", err);
154    }
155
156    /// Get the replication factor for queries
157    #[must_use]
158    pub fn replication_factor(&self) -> NonZeroUsize {
159        self.replication_factor
160    }
161    /// Publish a key/value to the kv store.
162    /// Once replicated upon all nodes, the caller is notified over
163    /// `chan`
164    pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165        self.in_progress_put_record_queries.insert(id, query);
166    }
167
168    /// Retrieve a value for a key from the DHT.
169    pub fn get_record(
170        &mut self,
171        key: Vec<u8>,
172        chans: Vec<Sender<Vec<u8>>>,
173        backoff: ExponentialBackoff,
174        retry_count: u8,
175        kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176    ) {
177        // noop
178        if retry_count == 0 {
179            return;
180        }
181
182        // Check the cache before making the (expensive) query
183        if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184            // The key already exists in the cache, send the value to all channels
185            for chan in chans {
186                if chan.send(entry.value.clone()).is_err() {
187                    debug!(
188                        "Get DHT: channel closed before get record request result could be sent"
189                    );
190                }
191            }
192        } else {
193            // Check if the key is already being queried
194            if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
195                // The key was already being queried. Add the channel to the existing query
196                // Try to get the query from the query id
197                let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
198                    warn!("Get DHT: outstanding query not found");
199                    return;
200                };
201
202                // Add the channel to the existing query
203                query.notify.extend(chans);
204            } else {
205                // The key was not already being queried and was not in the cache. Start a new query.
206                let qid = kad.get_record(key.clone().into());
207                let query = KadGetQuery {
208                    backoff,
209                    progress: DHTProgress::InProgress(qid),
210                    notify: chans,
211                    key: key.clone(),
212                    retry_count: retry_count - 1,
213                    records: Vec::new(),
214                };
215
216                // Add the key to the outstanding queries and in-progress queries
217                self.outstanding_dht_query_keys.insert(key, qid);
218                self.in_progress_record_queries.insert(qid, query);
219            }
220        }
221    }
222
223    /// Spawn a task which will retry the query after a backoff.
224    fn retry_get(&self, mut query: KadGetQuery) {
225        let Some(tx) = self.retry_tx.clone() else {
226            return;
227        };
228        let req = ClientRequest::GetDHT {
229            key: query.key,
230            notify: query.notify,
231            retry_count: query.retry_count,
232        };
233        let backoff = query.backoff.next_timeout(false);
234        spawn(async move {
235            sleep(backoff).await;
236            let _ = tx.send(req);
237        });
238    }
239
240    /// Spawn a task which will retry the query after a backoff.
241    fn retry_put(&self, mut query: KadPutQuery) {
242        let Some(tx) = self.retry_tx.clone() else {
243            return;
244        };
245        let req = ClientRequest::PutDHT {
246            key: query.key,
247            value: query.value,
248            notify: query.notify,
249        };
250        spawn(async move {
251            sleep(query.backoff.next_timeout(false)).await;
252            let _ = tx.send(req);
253        });
254    }
255
256    /// update state based on recv-ed get query
257    fn handle_get_query(
258        &mut self,
259        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
260        record_results: GetRecordResult,
261        id: QueryId,
262        mut last: bool,
263    ) {
264        let found = match self.in_progress_record_queries.get_mut(&id) {
265            Some(query) => match record_results {
266                Ok(results) => match results {
267                    GetRecordOk::FoundRecord(record) => {
268                        // Make sure the record has an expiration time
269                        if record.record.expires.is_some() {
270                            query.records.push(record.record);
271                            true
272                        } else {
273                            false
274                        }
275                    },
276                    GetRecordOk::FinishedWithNoAdditionalRecord {
277                        cache_candidates: _,
278                    } => {
279                        tracing::debug!("GetRecord Finished with No Additional Record");
280                        last = true;
281                        false
282                    },
283                },
284                Err(err) => {
285                    LogEvent::DhtKadQueryError.record();
286                    debug!("Error in Kademlia query: {err:?}");
287                    false
288                },
289            },
290            None => {
291                // We already finished the query (or it's been cancelled). Do nothing and exit the
292                // function.
293                return;
294            },
295        };
296
297        // If we have more than one record or the query has completed, we can return the record to the client.
298        if (found || last)
299            && let Some(KadGetQuery {
300                backoff,
301                progress,
302                notify,
303                key,
304                retry_count,
305                records,
306            }) = self.in_progress_record_queries.remove(&id)
307        {
308            // Remove the key from the outstanding queries so we are in sync
309            self.outstanding_dht_query_keys.remove(&key);
310
311            // `notify` is all channels that are still open
312            let notify = notify
313                .into_iter()
314                .filter(|n| !n.is_canceled())
315                .collect::<Vec<_>>();
316
317            // If all are closed, we can exit
318            if notify.is_empty() {
319                return;
320            }
321
322            // Find the record with the highest expiry
323            if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
324                // Only return the record if we can store it (validation passed)
325                if store.put(record.clone()).is_ok() {
326                    // Send the record to all channels that are still open
327                    for n in notify {
328                        if n.send(record.value.clone()).is_err() {
329                            debug!(
330                                "Get DHT: channel closed before get record request result could \
331                                 be sent"
332                            );
333                        }
334                    }
335                } else {
336                    error!("Failed to store record in local store");
337                }
338            }
339            // disagreement => query more nodes
340            else if retry_count > 0 {
341                let new_retry_count = retry_count - 1;
342                debug!(
343                    "Get DHT: Internal disagreement for get dht request {progress:?}! requerying \
344                     with more nodes. {new_retry_count:?} retries left"
345                );
346                self.retry_get(KadGetQuery {
347                    backoff,
348                    progress: DHTProgress::NotStarted,
349                    notify,
350                    key,
351                    retry_count: new_retry_count,
352                    records: Vec::new(),
353                });
354            } else {
355                LogEvent::DhtDisagreementGivenUp.record();
356                debug!(
357                    "Get DHT: Internal disagreement for get dht request {progress:?}! Giving up \
358                     because out of retries. "
359                );
360            }
361        }
362    }
363
364    /// Update state based on put query
365    fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
366        if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
367            // dropped so we handle further
368            if query.notify.is_canceled() {
369                return;
370            }
371
372            match record_results {
373                Ok(_) => {
374                    if query.notify.send(()).is_err() {
375                        warn!(
376                            "Put DHT: client channel closed before put record request could be \
377                             sent"
378                        );
379                    }
380                },
381                Err(e) => {
382                    query.progress = DHTProgress::NotStarted;
383                    query.backoff.start_next(false);
384
385                    warn!(
386                        "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
387                        e, self.peer_id
388                    );
389                    // push back onto the queue
390                    self.retry_put(query);
391                },
392            }
393        } else {
394            warn!("Put DHT: completed DHT query that is no longer tracked.");
395        }
396    }
397
398    /// Send that the bootstrap succeeded
399    fn finish_bootstrap(&mut self) {
400        if let Some(mut tx) = self.bootstrap_tx.clone() {
401            spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
402        }
403    }
404
405    #[allow(clippy::too_many_lines)]
406    /// handle a DHT event
407    pub fn dht_handle_event(
408        &mut self,
409        event: KademliaEvent,
410        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
411    ) -> Option<NetworkEvent> {
412        match event {
413            KademliaEvent::OutboundQueryProgressed {
414                result: QueryResult::PutRecord(record_results),
415                id,
416                step: ProgressStep { last, .. },
417                ..
418            } => {
419                if last {
420                    self.handle_put_query(record_results, id);
421                }
422            },
423            KademliaEvent::OutboundQueryProgressed {
424                result: QueryResult::GetClosestPeers(r),
425                id: query_id,
426                stats: _,
427                step: ProgressStep { last: true, .. },
428                ..
429            } => match r {
430                Ok(GetClosestPeersOk { key, peers: _ }) => {
431                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id)
432                        && chan.send(()).is_err()
433                    {
434                        warn!("DHT: finished query but client was no longer interested");
435                    };
436                    debug!("Successfully got closest peers for key {key:?}");
437                },
438                Err(e) => {
439                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
440                        let _: Result<_, _> = chan.send(());
441                    };
442                    LogEvent::DhtClosestPeersFailure.record();
443                    debug!("Failed to get closest peers: {e:?}");
444                },
445            },
446            KademliaEvent::OutboundQueryProgressed {
447                result: QueryResult::GetRecord(record_results),
448                id,
449                step: ProgressStep { last, .. },
450                ..
451            } => {
452                self.handle_get_query(store, record_results, id, last);
453            },
454            KademliaEvent::OutboundQueryProgressed {
455                result:
456                    QueryResult::Bootstrap(Ok(BootstrapOk {
457                        peer: _,
458                        num_remaining,
459                    })),
460                step: ProgressStep { last: true, .. },
461                ..
462            } => {
463                if num_remaining == 0 {
464                    self.finish_bootstrap();
465                } else {
466                    debug!("Bootstrap in progress, {num_remaining} nodes remaining");
467                }
468                return Some(NetworkEvent::IsBootstrapped);
469            },
470            KademliaEvent::OutboundQueryProgressed {
471                result: QueryResult::Bootstrap(Err(e)),
472                ..
473            } => {
474                let BootstrapError::Timeout { num_remaining, .. } = e;
475                if num_remaining.is_none() {
476                    error!("Failed to bootstrap: {e:?}");
477                }
478                self.finish_bootstrap();
479            },
480            KademliaEvent::RoutablePeer { peer, address: _ } => {
481                debug!("Found routable peer {peer:?}");
482            },
483            KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
484                debug!("Found pending routable peer {peer:?}");
485            },
486            KademliaEvent::UnroutablePeer { peer } => {
487                debug!("Found unroutable peer {peer:?}");
488            },
489            KademliaEvent::RoutingUpdated {
490                peer: _,
491                is_new_peer: _,
492                addresses: _,
493                bucket_range: _,
494                old_peer: _,
495            } => {
496                debug!("Routing table updated");
497            },
498            e @ KademliaEvent::OutboundQueryProgressed { .. } => {
499                debug!("Not handling dht event {e:?}");
500            },
501            e => {
502                debug!("New unhandled swarm event: {e:?}");
503            },
504        }
505        None
506    }
507}
508
509/// Metadata holder for get query
510#[derive(Debug)]
511pub(crate) struct KadGetQuery {
512    /// Exponential retry backoff
513    pub(crate) backoff: ExponentialBackoff,
514    /// progress through DHT query
515    pub(crate) progress: DHTProgress,
516    /// The channels to notify of the result
517    pub(crate) notify: Vec<Sender<Vec<u8>>>,
518    /// the key to look up
519    pub(crate) key: Vec<u8>,
520    /// the number of remaining retries before giving up
521    pub(crate) retry_count: u8,
522    /// already received records
523    pub(crate) records: Vec<Record>,
524}
525
526/// Metadata holder for get query
527#[derive(Debug)]
528pub struct KadPutQuery {
529    /// Exponential retry backoff
530    pub(crate) backoff: ExponentialBackoff,
531    /// progress through DHT query
532    pub(crate) progress: DHTProgress,
533    /// notify client of result
534    pub(crate) notify: Sender<()>,
535    /// the key to put
536    pub(crate) key: Vec<u8>,
537    /// the value to put
538    pub(crate) value: Vec<u8>,
539}
540
541/// represents progress through DHT
542#[derive(Debug, Clone, Eq, Hash, PartialEq)]
543pub enum DHTProgress {
544    /// The query has been started
545    InProgress(QueryId),
546    /// The query has not been started
547    NotStarted,
548}