Skip to main content

espresso_types/v0/v0_1/
l1.rs

1use std::{
2    num::NonZeroUsize,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use alloy::{
8    network::Ethereum,
9    primitives::{B256, U256},
10    providers::{
11        Identity, Provider, RootProvider,
12        fillers::{FillProvider, JoinFill, RecommendedFillers},
13    },
14    transports::http::{Client, Http},
15};
16use alloy_compat::ethers_serde;
17use async_broadcast::{InactiveReceiver, Sender};
18use clap::Parser;
19use derive_more::Deref;
20use hotshot_types::traits::metrics::{Counter, Gauge, Metrics, NoMetrics};
21use lru::LruCache;
22use parking_lot::RwLock;
23use serde::{Deserialize, Serialize};
24use tokio::{
25    sync::{Mutex, Notify},
26    task::JoinHandle,
27};
28use url::Url;
29
30use crate::v0::utils::parse_duration;
31
32#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Hash, PartialEq, Eq)]
33pub struct L1BlockInfo {
34    pub number: u64,
35    #[serde(with = "ethers_serde::u256")]
36    pub timestamp: U256,
37    #[serde(with = "ethers_serde::b256")]
38    pub hash: B256,
39}
40
41#[derive(Clone, Copy, Debug, PartialOrd, Ord, Hash, PartialEq, Eq)]
42pub(crate) struct L1BlockInfoWithParent {
43    pub(crate) info: L1BlockInfo,
44    pub(crate) parent_hash: B256,
45}
46
47#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Hash, PartialEq, Eq)]
48pub struct L1Snapshot {
49    /// The relevant snapshot of the L1 includes a reference to the current head of the L1 chain.
50    ///
51    /// Note that the L1 head is subject to changing due to a reorg. However, no reorg will change
52    /// the _number_ of this block in the chain: L1 block numbers will always be sequentially
53    /// increasing. Therefore, the sequencer does not have to worry about reorgs invalidating this
54    /// snapshot.
55    pub head: u64,
56
57    /// The snapshot also includes information about the latest finalized L1 block.
58    ///
59    /// Since this block is finalized (ie cannot be reorged) we can include specific information
60    /// about the particular block, such as its hash and timestamp.
61    ///
62    /// This block may be `None` in the rare case where Espresso has started shortly after the
63    /// genesis of the L1, and the L1 has yet to finalize a block. In all other cases it will be
64    /// `Some`.
65    pub finalized: Option<L1BlockInfo>,
66}
67
68/// Configuration for an L1 client.
69#[derive(Clone, Debug, Parser)]
70pub struct L1ClientOptions {
71    /// Delay when retrying failed L1 queries.
72    #[clap(
73        long,
74        env = "ESPRESSO_L1_RETRY_DELAY",
75        default_value = "1s",
76        value_parser = parse_duration,
77    )]
78    pub l1_retry_delay: Duration,
79
80    /// Request rate when polling L1.
81    #[clap(
82        long,
83        env = "ESPRESSO_L1_POLLING_INTERVAL",
84        default_value = "7s",
85        value_parser = parse_duration,
86    )]
87    pub l1_polling_interval: Duration,
88
89    /// Maximum number of L1 blocks to keep in cache at once.
90    #[clap(long, env = "ESPRESSO_L1_BLOCKS_CACHE_SIZE", default_value = "100")]
91    pub l1_blocks_cache_size: NonZeroUsize,
92
93    /// Number of L1 events to buffer before discarding.
94    #[clap(
95        long,
96        env = "ESPRESSO_L1_EVENTS_CHANNEL_CAPACITY",
97        default_value = "100"
98    )]
99    pub l1_events_channel_capacity: usize,
100
101    /// Maximum number of L1 blocks that can be scanned for events in a single query.
102    #[clap(
103        long,
104        env = "ESPRESSO_L1_EVENTS_MAX_BLOCK_RANGE",
105        default_value = "10000"
106    )]
107    pub l1_events_max_block_range: u64,
108
109    /// Maximum time to wait for new heads before considering a stream invalid and reconnecting.
110    #[clap(
111        long,
112        env = "ESPRESSO_L1_SUBSCRIPTION_TIMEOUT",
113        default_value = "1m",
114        value_parser = parse_duration,
115    )]
116    pub subscription_timeout: Duration,
117
118    /// Fail over to another provider if the current provider fails twice within this window.
119    #[clap(
120        long,
121        env = "ESPRESSO_L1_FREQUENT_FAILURE_TOLERANCE",
122        default_value = "1m",
123        value_parser = parse_duration,
124    )]
125    pub l1_frequent_failure_tolerance: Duration,
126
127    /// Fail over to another provider if the current provider fails many times in a row, within any
128    /// time window.
129    #[clap(
130        long,
131        env = "ESPRESSO_L1_CONSECUTIVE_FAILURE_TOLERANCE",
132        default_value = "10"
133    )]
134    pub l1_consecutive_failure_tolerance: usize,
135
136    /// Revert back to the first provider this duration after failing over.
137    #[clap(
138        long,
139        env = "ESPRESSO_L1_FAILOVER_REVERT",
140        default_value = "30m",
141        value_parser = parse_duration,
142    )]
143    pub l1_failover_revert: Duration,
144
145    /// Amount of time to wait after receiving a 429 response before making more L1 RPC requests.
146    ///
147    /// If not set, the general l1-retry-delay will be used.
148    #[clap(
149        long,
150        env = "ESPRESSO_L1_RATE_LIMIT_DELAY",
151        value_parser = parse_duration,
152    )]
153    pub l1_rate_limit_delay: Option<Duration>,
154
155    /// Separate provider to use for subscription feeds.
156    ///
157    /// Typically this would be a WebSockets endpoint while the main provider uses HTTP.
158    #[clap(long, env = "ESPRESSO_L1_WS_PROVIDER", value_delimiter = ',')]
159    pub l1_ws_provider: Option<Vec<Url>>,
160
161    /// Interval at which the background update loop polls the L1 stake table contract for new events
162    /// and updates local persistence.
163    ///
164    #[clap(
165        long,
166        env = "ESPRESSO_NODE_L1_STAKE_TABLE_UPDATE_INTERVAL",
167        default_value = "60m",
168        value_parser = parse_duration,
169    )]
170    pub stake_table_update_interval: Duration,
171
172    /// Maximum duration to retry fetching L1 events before panicking.
173    ///
174    /// This prevents infinite retries by panicking if the total number of retries exceed the maximum duration.
175    /// This is helpful in cases where the RPC block range limit or the event return limit is hit,
176    /// or if there is an outage. In such cases, panicking ensures that the node operator can take
177    /// action instead of the node getting stuck indefinitely. This is necessary because the stake table is constructed
178    /// from the fetched events, and is required for node to participate in consensus.
179    #[clap(
180        long,
181        env = "ESPRESSO_L1_EVENTS_MAX_RETRY_DURATION",
182        default_value = "20m",
183        value_parser = parse_duration,
184    )]
185    pub l1_events_max_retry_duration: Duration,
186
187    /// A block range which is expected to contain the finalized heads of all L1 provider chains.
188    ///
189    /// If specified, it is assumed that if a block `n` is known to be finalized according to a
190    /// certain provider, then any block less than `n - L1_FINALIZED_SAFETY_MARGIN` is finalized
191    /// _according to any provider_. In other words, if we fail over from one provider to another,
192    /// the second provider will never be lagging the first by more than this margin.
193    ///
194    /// This allows us to quickly query for very old finalized blocks by number. Without this
195    /// assumption, we always need to verify that a block is finalized by fetching all blocks in a
196    /// hash chain between the known finalized block and the desired block, recomputing and checking
197    /// the hashes. This is fine and good for blocks very near the finalized head, but for
198    /// extremely old blocks it is prohibitively expensive, and these old blocks are extremely
199    /// unlikely to be unfinalized anyways.
200    #[clap(long, env = "ESPRESSO_L1_FINALIZED_SAFETY_MARGIN")]
201    pub l1_finalized_safety_margin: Option<u64>,
202
203    #[clap(skip = Arc::<Box<dyn Metrics>>::new(Box::new(NoMetrics)))]
204    pub metrics: Arc<Box<dyn Metrics>>,
205}
206
207/// Type alias for alloy provider
208pub type L1Provider = FillProvider<
209    JoinFill<Identity, <Ethereum as RecommendedFillers>::RecommendedFillers>,
210    RootProvider,
211>;
212
213#[derive(Clone, Debug, Deref)]
214/// An Ethereum provider and configuration to interact with the L1.
215///
216/// This client runs asynchronously, updating an in-memory snapshot of the relevant L1 information
217/// each time a new L1 block is published. The main advantage of this is that we can update the L1
218/// state at the pace of the L1, instead of the much faster pace of HotShot consensus.This makes it
219/// easy to use a subscription instead of polling for new blocks, vastly reducing the number of L1
220/// RPC calls we make.
221pub struct L1Client {
222    /// The alloy provider used for L1 communication with wallet and default fillers
223    #[deref]
224    pub provider: L1Provider,
225    /// Actual transport used in `self.provider`
226    /// i.e. the `t` variable in `ProviderBuilder::new().on_client(RpcClient::new(t, is_local))`
227    pub transport: SwitchingTransport,
228    /// Shared state updated by an asynchronous task which polls the L1.
229    pub(crate) state: Arc<Mutex<L1State>>,
230    /// Channel used by the async update task to send events to clients.
231    pub(crate) sender: Sender<L1Event>,
232    /// Receiver for events from the async update task.
233    pub(crate) receiver: InactiveReceiver<L1Event>,
234    /// Async task which updates the shared state.
235    pub(crate) update_task: Arc<L1UpdateTask>,
236}
237
238impl Provider for L1Client {
239    fn root(&self) -> &RootProvider {
240        self.provider.root()
241    }
242}
243
244/// In-memory view of the L1 state, updated asynchronously.
245#[derive(Debug)]
246pub(crate) struct L1State {
247    pub(crate) snapshot: L1Snapshot,
248    pub(crate) finalized: LruCache<u64, L1BlockInfoWithParent>,
249    pub(crate) last_finalized: Option<u64>,
250}
251
252#[derive(Clone, Debug)]
253pub(crate) enum L1Event {
254    NewHead { head: u64 },
255    NewFinalized { finalized: L1BlockInfoWithParent },
256}
257
258#[derive(Debug, Default)]
259pub(crate) struct L1UpdateTask(pub(crate) Mutex<Option<JoinHandle<()>>>);
260
261#[derive(Clone, Debug)]
262pub(crate) struct L1ClientMetrics {
263    pub(crate) head: Arc<dyn Gauge>,
264    pub(crate) finalized: Arc<dyn Gauge>,
265    pub(crate) reconnects: Arc<dyn Counter>,
266    pub(crate) failovers: Arc<dyn Counter>,
267    pub(crate) failures: Arc<Vec<Box<dyn Counter>>>,
268}
269
270/// An RPC client with multiple remote (HTTP) providers.
271///
272/// This client utilizes one RPC provider at a time, but if it detects that the provider is in a
273/// failing state, it will automatically switch to the next provider in its list.
274#[derive(Clone, Debug)]
275pub struct SwitchingTransport {
276    /// The transport currently being used by the client
277    pub(crate) current_transport: Arc<RwLock<SingleTransport>>,
278    /// The list of configured HTTP URLs to use for RPC requests
279    pub(crate) urls: Arc<Vec<Url>>,
280    pub(crate) opt: Arc<L1ClientOptions>,
281    pub(crate) metrics: L1ClientMetrics,
282    pub(crate) switch_notify: Arc<Notify>,
283}
284
285/// The state of the current provider being used by a [`SwitchingTransport`].
286/// This is cloneable and returns a reference to the same underlying data.
287#[derive(Debug, Clone)]
288pub(crate) struct SingleTransport {
289    pub(crate) generation: usize,
290    pub(crate) client: Http<Client>,
291    pub(crate) status: Arc<RwLock<SingleTransportStatus>>,
292    /// Time at which to revert back to the primary provider after a failover.
293    pub(crate) revert_at: Option<Instant>,
294}
295
296/// The status of a single transport
297#[derive(Debug, Default)]
298pub(crate) struct SingleTransportStatus {
299    pub(crate) last_failure: Option<Instant>,
300    pub(crate) consecutive_failures: usize,
301    pub(crate) rate_limited_until: Option<Instant>,
302    /// Whether or not this current transport is being shut down (switching to the next transport)
303    pub(crate) shutting_down: bool,
304}