espresso_types/v0/v0_1/l1.rs
1use std::{
2 num::NonZeroUsize,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use alloy::{
8 network::Ethereum,
9 primitives::{B256, U256},
10 providers::{
11 Identity, Provider, RootProvider,
12 fillers::{FillProvider, JoinFill, RecommendedFillers},
13 },
14 transports::http::{Client, Http},
15};
16use alloy_compat::ethers_serde;
17use async_broadcast::{InactiveReceiver, Sender};
18use clap::Parser;
19use derive_more::Deref;
20use hotshot_types::traits::metrics::{Counter, Gauge, Metrics, NoMetrics};
21use lru::LruCache;
22use parking_lot::RwLock;
23use serde::{Deserialize, Serialize};
24use tokio::{
25 sync::{Mutex, Notify},
26 task::JoinHandle,
27};
28use url::Url;
29
30use crate::v0::utils::parse_duration;
31
32#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Hash, PartialEq, Eq)]
33pub struct L1BlockInfo {
34 pub number: u64,
35 #[serde(with = "ethers_serde::u256")]
36 pub timestamp: U256,
37 #[serde(with = "ethers_serde::b256")]
38 pub hash: B256,
39}
40
41#[derive(Clone, Copy, Debug, PartialOrd, Ord, Hash, PartialEq, Eq)]
42pub(crate) struct L1BlockInfoWithParent {
43 pub(crate) info: L1BlockInfo,
44 pub(crate) parent_hash: B256,
45}
46
47#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Hash, PartialEq, Eq)]
48pub struct L1Snapshot {
49 /// The relevant snapshot of the L1 includes a reference to the current head of the L1 chain.
50 ///
51 /// Note that the L1 head is subject to changing due to a reorg. However, no reorg will change
52 /// the _number_ of this block in the chain: L1 block numbers will always be sequentially
53 /// increasing. Therefore, the sequencer does not have to worry about reorgs invalidating this
54 /// snapshot.
55 pub head: u64,
56
57 /// The snapshot also includes information about the latest finalized L1 block.
58 ///
59 /// Since this block is finalized (ie cannot be reorged) we can include specific information
60 /// about the particular block, such as its hash and timestamp.
61 ///
62 /// This block may be `None` in the rare case where Espresso has started shortly after the
63 /// genesis of the L1, and the L1 has yet to finalize a block. In all other cases it will be
64 /// `Some`.
65 pub finalized: Option<L1BlockInfo>,
66}
67
68/// Configuration for an L1 client.
69#[derive(Clone, Debug, Parser)]
70pub struct L1ClientOptions {
71 /// Delay when retrying failed L1 queries.
72 #[clap(
73 long,
74 env = "ESPRESSO_L1_RETRY_DELAY",
75 default_value = "1s",
76 value_parser = parse_duration,
77 )]
78 pub l1_retry_delay: Duration,
79
80 /// Request rate when polling L1.
81 #[clap(
82 long,
83 env = "ESPRESSO_L1_POLLING_INTERVAL",
84 default_value = "7s",
85 value_parser = parse_duration,
86 )]
87 pub l1_polling_interval: Duration,
88
89 /// Maximum number of L1 blocks to keep in cache at once.
90 #[clap(long, env = "ESPRESSO_L1_BLOCKS_CACHE_SIZE", default_value = "100")]
91 pub l1_blocks_cache_size: NonZeroUsize,
92
93 /// Number of L1 events to buffer before discarding.
94 #[clap(
95 long,
96 env = "ESPRESSO_L1_EVENTS_CHANNEL_CAPACITY",
97 default_value = "100"
98 )]
99 pub l1_events_channel_capacity: usize,
100
101 /// Maximum number of L1 blocks that can be scanned for events in a single query.
102 #[clap(
103 long,
104 env = "ESPRESSO_L1_EVENTS_MAX_BLOCK_RANGE",
105 default_value = "10000"
106 )]
107 pub l1_events_max_block_range: u64,
108
109 /// Maximum time to wait for new heads before considering a stream invalid and reconnecting.
110 #[clap(
111 long,
112 env = "ESPRESSO_L1_SUBSCRIPTION_TIMEOUT",
113 default_value = "1m",
114 value_parser = parse_duration,
115 )]
116 pub subscription_timeout: Duration,
117
118 /// Fail over to another provider if the current provider fails twice within this window.
119 #[clap(
120 long,
121 env = "ESPRESSO_L1_FREQUENT_FAILURE_TOLERANCE",
122 default_value = "1m",
123 value_parser = parse_duration,
124 )]
125 pub l1_frequent_failure_tolerance: Duration,
126
127 /// Fail over to another provider if the current provider fails many times in a row, within any
128 /// time window.
129 #[clap(
130 long,
131 env = "ESPRESSO_L1_CONSECUTIVE_FAILURE_TOLERANCE",
132 default_value = "10"
133 )]
134 pub l1_consecutive_failure_tolerance: usize,
135
136 /// Revert back to the first provider this duration after failing over.
137 #[clap(
138 long,
139 env = "ESPRESSO_L1_FAILOVER_REVERT",
140 default_value = "30m",
141 value_parser = parse_duration,
142 )]
143 pub l1_failover_revert: Duration,
144
145 /// Amount of time to wait after receiving a 429 response before making more L1 RPC requests.
146 ///
147 /// If not set, the general l1-retry-delay will be used.
148 #[clap(
149 long,
150 env = "ESPRESSO_L1_RATE_LIMIT_DELAY",
151 value_parser = parse_duration,
152 )]
153 pub l1_rate_limit_delay: Option<Duration>,
154
155 /// Separate provider to use for subscription feeds.
156 ///
157 /// Typically this would be a WebSockets endpoint while the main provider uses HTTP.
158 #[clap(long, env = "ESPRESSO_L1_WS_PROVIDER", value_delimiter = ',')]
159 pub l1_ws_provider: Option<Vec<Url>>,
160
161 /// Interval at which the background update loop polls the L1 stake table contract for new events
162 /// and updates local persistence.
163 ///
164 #[clap(
165 long,
166 env = "ESPRESSO_NODE_L1_STAKE_TABLE_UPDATE_INTERVAL",
167 default_value = "60m",
168 value_parser = parse_duration,
169 )]
170 pub stake_table_update_interval: Duration,
171
172 /// Maximum duration to retry fetching L1 events before panicking.
173 ///
174 /// This prevents infinite retries by panicking if the total number of retries exceed the maximum duration.
175 /// This is helpful in cases where the RPC block range limit or the event return limit is hit,
176 /// or if there is an outage. In such cases, panicking ensures that the node operator can take
177 /// action instead of the node getting stuck indefinitely. This is necessary because the stake table is constructed
178 /// from the fetched events, and is required for node to participate in consensus.
179 #[clap(
180 long,
181 env = "ESPRESSO_L1_EVENTS_MAX_RETRY_DURATION",
182 default_value = "20m",
183 value_parser = parse_duration,
184 )]
185 pub l1_events_max_retry_duration: Duration,
186
187 /// A block range which is expected to contain the finalized heads of all L1 provider chains.
188 ///
189 /// If specified, it is assumed that if a block `n` is known to be finalized according to a
190 /// certain provider, then any block less than `n - L1_FINALIZED_SAFETY_MARGIN` is finalized
191 /// _according to any provider_. In other words, if we fail over from one provider to another,
192 /// the second provider will never be lagging the first by more than this margin.
193 ///
194 /// This allows us to quickly query for very old finalized blocks by number. Without this
195 /// assumption, we always need to verify that a block is finalized by fetching all blocks in a
196 /// hash chain between the known finalized block and the desired block, recomputing and checking
197 /// the hashes. This is fine and good for blocks very near the finalized head, but for
198 /// extremely old blocks it is prohibitively expensive, and these old blocks are extremely
199 /// unlikely to be unfinalized anyways.
200 #[clap(long, env = "ESPRESSO_L1_FINALIZED_SAFETY_MARGIN")]
201 pub l1_finalized_safety_margin: Option<u64>,
202
203 #[clap(skip = Arc::<Box<dyn Metrics>>::new(Box::new(NoMetrics)))]
204 pub metrics: Arc<Box<dyn Metrics>>,
205}
206
207/// Type alias for alloy provider
208pub type L1Provider = FillProvider<
209 JoinFill<Identity, <Ethereum as RecommendedFillers>::RecommendedFillers>,
210 RootProvider,
211>;
212
213#[derive(Clone, Debug, Deref)]
214/// An Ethereum provider and configuration to interact with the L1.
215///
216/// This client runs asynchronously, updating an in-memory snapshot of the relevant L1 information
217/// each time a new L1 block is published. The main advantage of this is that we can update the L1
218/// state at the pace of the L1, instead of the much faster pace of HotShot consensus.This makes it
219/// easy to use a subscription instead of polling for new blocks, vastly reducing the number of L1
220/// RPC calls we make.
221pub struct L1Client {
222 /// The alloy provider used for L1 communication with wallet and default fillers
223 #[deref]
224 pub provider: L1Provider,
225 /// Actual transport used in `self.provider`
226 /// i.e. the `t` variable in `ProviderBuilder::new().on_client(RpcClient::new(t, is_local))`
227 pub transport: SwitchingTransport,
228 /// Shared state updated by an asynchronous task which polls the L1.
229 pub(crate) state: Arc<Mutex<L1State>>,
230 /// Channel used by the async update task to send events to clients.
231 pub(crate) sender: Sender<L1Event>,
232 /// Receiver for events from the async update task.
233 pub(crate) receiver: InactiveReceiver<L1Event>,
234 /// Async task which updates the shared state.
235 pub(crate) update_task: Arc<L1UpdateTask>,
236}
237
238impl Provider for L1Client {
239 fn root(&self) -> &RootProvider {
240 self.provider.root()
241 }
242}
243
244/// In-memory view of the L1 state, updated asynchronously.
245#[derive(Debug)]
246pub(crate) struct L1State {
247 pub(crate) snapshot: L1Snapshot,
248 pub(crate) finalized: LruCache<u64, L1BlockInfoWithParent>,
249 pub(crate) last_finalized: Option<u64>,
250}
251
252#[derive(Clone, Debug)]
253pub(crate) enum L1Event {
254 NewHead { head: u64 },
255 NewFinalized { finalized: L1BlockInfoWithParent },
256}
257
258#[derive(Debug, Default)]
259pub(crate) struct L1UpdateTask(pub(crate) Mutex<Option<JoinHandle<()>>>);
260
261#[derive(Clone, Debug)]
262pub(crate) struct L1ClientMetrics {
263 pub(crate) head: Arc<dyn Gauge>,
264 pub(crate) finalized: Arc<dyn Gauge>,
265 pub(crate) reconnects: Arc<dyn Counter>,
266 pub(crate) failovers: Arc<dyn Counter>,
267 pub(crate) failures: Arc<Vec<Box<dyn Counter>>>,
268}
269
270/// An RPC client with multiple remote (HTTP) providers.
271///
272/// This client utilizes one RPC provider at a time, but if it detects that the provider is in a
273/// failing state, it will automatically switch to the next provider in its list.
274#[derive(Clone, Debug)]
275pub struct SwitchingTransport {
276 /// The transport currently being used by the client
277 pub(crate) current_transport: Arc<RwLock<SingleTransport>>,
278 /// The list of configured HTTP URLs to use for RPC requests
279 pub(crate) urls: Arc<Vec<Url>>,
280 pub(crate) opt: Arc<L1ClientOptions>,
281 pub(crate) metrics: L1ClientMetrics,
282 pub(crate) switch_notify: Arc<Notify>,
283}
284
285/// The state of the current provider being used by a [`SwitchingTransport`].
286/// This is cloneable and returns a reference to the same underlying data.
287#[derive(Debug, Clone)]
288pub(crate) struct SingleTransport {
289 pub(crate) generation: usize,
290 pub(crate) client: Http<Client>,
291 pub(crate) status: Arc<RwLock<SingleTransportStatus>>,
292 /// Time at which to revert back to the primary provider after a failover.
293 pub(crate) revert_at: Option<Instant>,
294}
295
296/// The status of a single transport
297#[derive(Debug, Default)]
298pub(crate) struct SingleTransportStatus {
299 pub(crate) last_failure: Option<Instant>,
300 pub(crate) consecutive_failures: usize,
301 pub(crate) rate_limited_until: Option<Instant>,
302 /// Whether or not this current transport is being shut down (switching to the next transport)
303 pub(crate) shutting_down: bool,
304}