espresso_node/request_response/
recipient_source.rs1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use espresso_types::{PubKey, SeqTypes};
6use hotshot::{SystemContext, traits::NodeImplementation};
7use hotshot_types::{data::EpochNumber, epoch_membership::EpochMembershipCoordinator};
8use request_response::recipient_source::RecipientSource as RecipientSourceTrait;
9use tracing::warn;
10
11use super::request::Request;
12
13type Consensus<I> = Arc<SystemContext<SeqTypes, I>>;
15
16#[derive(Clone)]
17pub struct RecipientSource<I: NodeImplementation<SeqTypes>> {
18 pub consensus: Consensus<I>,
20 pub memberships: EpochMembershipCoordinator<SeqTypes>,
22 pub public_key: PubKey,
24}
25
26#[async_trait]
29impl<I: NodeImplementation<SeqTypes>> RecipientSourceTrait<Request, PubKey> for RecipientSource<I> {
30 async fn get_expected_responders(&self, _request: &Request) -> Result<Vec<PubKey>> {
31 let epoch_number = self
33 .consensus
34 .consensus()
35 .read()
36 .await
37 .cur_epoch()
38 .unwrap_or(EpochNumber::genesis());
39
40 let membership = match self
42 .memberships
43 .stake_table_for_epoch(Some(epoch_number))
44 .await
45 {
46 Ok(membership) => membership,
47 Err(e) => {
48 warn!(
49 "Failed to get membership for epoch {}: {e:#}. Failing over to previous epoch",
50 epoch_number
51 );
52 let prev_epoch = epoch_number.saturating_sub(1);
53 self.memberships
54 .stake_table_for_epoch(Some(EpochNumber::new(prev_epoch)))
55 .await
56 .with_context(|| "failed to get stake table for epoch")?
57 },
58 };
59
60 Ok(membership
62 .stake_table()
63 .await
64 .iter()
65 .map(|entry| entry.stake_table_entry.stake_key)
66 .filter(|key| *key != self.public_key)
67 .collect())
68 }
69}