espresso_node/request_response/
recipient_source.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use espresso_types::{PubKey, SeqTypes};
6use hotshot::{SystemContext, traits::NodeImplementation};
7use hotshot_types::{data::EpochNumber, epoch_membership::EpochMembershipCoordinator};
8use request_response::recipient_source::RecipientSource as RecipientSourceTrait;
9use tracing::warn;
10
11use super::request::Request;
12
13/// A type alias for the consensus context
14type Consensus<I> = Arc<SystemContext<SeqTypes, I>>;
15
16#[derive(Clone)]
17pub struct RecipientSource<I: NodeImplementation<SeqTypes>> {
18    /// A copy of the consensus context
19    pub consensus: Consensus<I>,
20    /// A copy of the membership coordinator
21    pub memberships: EpochMembershipCoordinator<SeqTypes>,
22    /// The public key of the node
23    pub public_key: PubKey,
24}
25
26/// Implement the RecipientSourceTrait, which allows the request-response protocol to derive the
27/// intended recipients for a given request
28#[async_trait]
29impl<I: NodeImplementation<SeqTypes>> RecipientSourceTrait<Request, PubKey> for RecipientSource<I> {
30    async fn get_expected_responders(&self, _request: &Request) -> Result<Vec<PubKey>> {
31        // Get the current epoch number
32        let epoch_number = self
33            .consensus
34            .consensus()
35            .read()
36            .await
37            .cur_epoch()
38            .unwrap_or(EpochNumber::genesis());
39
40        // Attempt to get the membership for the current epoch
41        let membership = match self
42            .memberships
43            .stake_table_for_epoch(Some(epoch_number))
44            .await
45        {
46            Ok(membership) => membership,
47            Err(e) => {
48                warn!(
49                    "Failed to get membership for epoch {}: {e:#}. Failing over to previous epoch",
50                    epoch_number
51                );
52                let prev_epoch = epoch_number.saturating_sub(1);
53                self.memberships
54                    .stake_table_for_epoch(Some(EpochNumber::new(prev_epoch)))
55                    .await
56                    .with_context(|| "failed to get stake table for epoch")?
57            },
58        };
59
60        // Sum all participants in the membership
61        Ok(membership
62            .stake_table()
63            .await
64            .iter()
65            .map(|entry| entry.stake_table_entry.stake_key)
66            .filter(|key| *key != self.public_key)
67            .collect())
68    }
69}