hotshot_query_service/data_source/fs.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::path::Path;
16
17use atomic_store::AtomicStoreLoader;
18use hotshot_types::traits::node_implementation::NodeType;
19
20pub use super::storage::fs::Transaction;
21use super::{AvailabilityProvider, FetchingDataSource, storage::FileSystemStorage};
22use crate::{
23 Header, Payload,
24 availability::{QueryableHeader, query_data::QueryablePayload},
25 data_source::fetching,
26};
27
28/// A data source for the APIs provided in this crate, backed by the local file system.
29///
30/// Synchronization and atomicity of persisted data structures are provided via [`atomic_store`].
31/// The methods [`commit`](super::Transaction::commit), [`revert`](super::Transaction::revert), and
32/// [`skip_version`](Self::skip_version) of this type and its associated [`Transaction`] type can be
33/// used to control synchronization in the underlying [`AtomicStore`](atomic_store::AtomicStore).
34///
35/// Note that because [`AtomicStore`](atomic_store::AtomicStore) only allows changes to be made to
36/// the underlying store, a [`Transaction`] takes full control of the whole store, and does not
37/// permit concurrent readers or other transactions while in flight. This is enforced internally via
38/// a global `RwLock`, and is a significant downside of this storage implementation, compared to the
39/// more relaxed concurrency semantics of a SQL implementation.
40///
41/// # Extension and Composition
42///
43/// [`FileSystemDataSource`] is designed to be both extensible (so you can add additional state to
44/// the API modules defined in this crate) and composable (so you can use [`FileSystemDataSource`]
45/// as one component of a larger state type for an application with additional modules).
46///
47/// ## Extension
48///
49/// Adding additional, application-specific state to [`FileSystemDataSource`] is possible by
50/// wrapping it in [`ExtensibleDataSource`](super::ExtensibleDataSource):
51///
52/// ```
53/// # use hotshot_query_service::data_source::{ExtensibleDataSource, FileSystemDataSource};
54/// # use hotshot_query_service::fetching::provider::NoFetching;
55/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
56/// # use std::path::Path;
57/// # async fn doc(storage_path: &Path) -> Result<(), anyhow::Error> {
58/// type AppState = &'static str;
59///
60/// let data_source: ExtensibleDataSource<FileSystemDataSource<AppTypes, NoFetching>, AppState> =
61/// ExtensibleDataSource::new(FileSystemDataSource::create(storage_path, NoFetching).await?, "app state");
62/// # Ok(())
63/// # }
64/// ```
65///
66/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
67/// source traits as [`FileSystemDataSource`], and also provides access to the `AppState` parameter
68/// for use in API endpoint handlers. This can be used to implement an app-specific data source
69/// trait and add a new API endpoint that uses this app-specific data, as described in the
70/// [extension guide](crate#extension).
71///
72/// ## Composition
73///
74/// Composing [`FileSystemDataSource`] with other module states is in principle simple -- just
75/// create an aggregate struct containing both [`FileSystemDataSource`] and your additional module
76/// states. A complication arises from how persistent storage is managed: if other modules have
77/// their own persistent state, should the storage of [`FileSystemDataSource`] and the other modules
78/// be completely independent, or synchronized under the control of a single
79/// [`AtomicStore`](atomic_store::AtomicStore)? [`FileSystemDataSource`] supports both patterns:
80/// when you create it with [`create`](Self::create) or [`open`](Self::open), it will open its own
81/// [`AtomicStore`](atomic_store::AtomicStore) and manage the synchronization of its own storage,
82/// independent of any other persistent data it might be composed with. But when you create it with
83/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
84/// you may ask it to register its persistent data structures with an existing
85/// [`AtomicStoreLoader`]. If you register other modules' persistent data structures with the same
86/// loader, you can create one [`AtomicStore`](atomic_store::AtomicStore) that synchronizes all the
87/// persistent data. Note, though, that when you choose to use
88/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
89/// you become responsible for ensuring that calls to
90/// [`AtomicStore::commit_version`](atomic_store::AtomicStore::commit_version) alternate with calls
91/// to [`commit`](super::Transaction::commit) or [`skip_version`](Self::skip_version).
92///
93/// In the following example, we compose HotShot query service modules with other application-
94/// specific modules, using a single top-level [`AtomicStore`](atomic_store::AtomicStore) to
95/// synchronize all persistent storage.
96///
97/// ```
98/// # use atomic_store::{AtomicStore, AtomicStoreLoader};
99/// # use futures::StreamExt;
100/// # use hotshot::types::SystemContextHandle;
101/// # use hotshot_query_service::Error;
102/// # use hotshot_query_service::data_source::{
103/// # FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource,
104/// # };
105/// # use hotshot_query_service::fetching::provider::NoFetching;
106/// # use hotshot_query_service::testing::mocks::{
107/// # MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
108/// # };
109/// # use hotshot_example_types::node_types::TestVersions;
110/// # use std::{path::Path, sync::Arc};
111/// # use tide_disco::App;
112/// # use tokio::{spawn, sync::RwLock};
113/// # use vbs::version::StaticVersionType;
114/// struct AppState {
115/// // Top-level storage coordinator
116/// store: AtomicStore,
117/// hotshot_qs: FileSystemDataSource<AppTypes, NoFetching>,
118/// // additional state for other modules
119/// }
120///
121/// async fn init_server<Ver: StaticVersionType + 'static>(
122/// storage_path: &Path,
123/// hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
124/// ) -> anyhow::Result<App<Arc<RwLock<AppState>>, Error>> {
125/// let mut loader = AtomicStoreLoader::create(storage_path, "my_app")?; // or `open`
126/// let hotshot_qs = FileSystemDataSource::create_with_store(&mut loader, NoFetching)
127/// .await?;
128/// // Initialize storage for other modules using the same loader.
129///
130/// let store = AtomicStore::open(loader)?;
131/// let state = Arc::new(RwLock::new(AppState {
132/// store,
133/// hotshot_qs,
134/// // additional state for other modules
135/// }));
136/// let mut app = App::with_state(state.clone());
137/// // Register API modules.
138///
139/// spawn(async move {
140/// let mut events = hotshot.event_stream();
141/// while let Some(event) = events.next().await {
142/// let mut state = state.write().await;
143/// if state.hotshot_qs.update(&event).await.is_err() {
144/// continue;
145/// }
146///
147/// // Update other modules' states based on `event`.
148/// let mut tx = state.hotshot_qs.write().await.unwrap();
149/// // Do updates
150/// tx.commit().await.unwrap();
151///
152/// // Commit or skip versions for other modules' storage.
153/// state.store.commit_version().unwrap();
154/// }
155/// });
156///
157/// Ok(app)
158/// }
159/// ```
160pub type FileSystemDataSource<Types, P> = FetchingDataSource<Types, FileSystemStorage<Types>, P>;
161
162/// Builder for configuring a [`FileSystemDataSource`].
163pub type Builder<Types, P> = fetching::Builder<Types, FileSystemStorage<Types>, P>;
164
165impl<Types: NodeType, P> FileSystemDataSource<Types, P>
166where
167 Payload<Types>: QueryablePayload<Types>,
168 Header<Types>: QueryableHeader<Types>,
169 P: AvailabilityProvider<Types>,
170{
171 /// Create a new [FileSystemDataSource] with storage at `path`.
172 ///
173 /// If there is already data at `path`, it will be archived.
174 ///
175 /// The [FileSystemDataSource] will manage its own persistence synchronization.
176 pub async fn create(path: &Path, provider: P) -> anyhow::Result<Self> {
177 Self::create_builder(path, provider).await?.build().await
178 }
179
180 /// Create a new [FileSystemDataSource] with storage at `path`, and return a builder for
181 /// configuration.
182 ///
183 /// If there is already data at `path`, it will be archived.
184 ///
185 /// The [FileSystemDataSource] will manage its own persistence synchronization.
186 pub async fn create_builder(path: &Path, provider: P) -> anyhow::Result<Builder<Types, P>> {
187 Ok(FileSystemDataSource::builder(
188 FileSystemStorage::create(path).await?,
189 provider,
190 ))
191 }
192
193 /// Open an existing [FileSystemDataSource] from storage at `path`.
194 ///
195 /// If there is no data at `path`, a new store will be created.
196 ///
197 /// The [FileSystemDataSource] will manage its own persistence synchronization.
198 pub async fn open(path: &Path, provider: P) -> anyhow::Result<Self> {
199 Self::open_builder(path, provider).await?.build().await
200 }
201
202 /// Open an existing [FileSystemDataSource] from storage at `path`, and return a builder for
203 /// configuration.
204 ///
205 /// If there is no data at `path`, a new store will be created.
206 ///
207 /// The [FileSystemDataSource] will manage its own persistence synchronization.
208 pub async fn open_builder(path: &Path, provider: P) -> anyhow::Result<Builder<Types, P>> {
209 Ok(FileSystemDataSource::builder(
210 FileSystemStorage::open(path).await?,
211 provider,
212 ))
213 }
214
215 /// Create a new [FileSystemDataSource] using a persistent storage loader.
216 ///
217 /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
218 /// will be archived.
219 ///
220 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
221 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
222 /// and managing synchronization of the store.
223 pub async fn create_with_store(
224 loader: &mut AtomicStoreLoader,
225 provider: P,
226 ) -> anyhow::Result<Self> {
227 Self::create_builder_with_store(loader, provider)
228 .await?
229 .build()
230 .await
231 }
232
233 /// Create a new [FileSystemDataSource] using a persistent storage loader, and return a builder
234 /// for configuration.
235 ///
236 /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
237 /// will be archived.
238 ///
239 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
240 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
241 /// and managing synchronization of the store.
242 pub async fn create_builder_with_store(
243 loader: &mut AtomicStoreLoader,
244 provider: P,
245 ) -> anyhow::Result<Builder<Types, P>> {
246 Ok(FileSystemDataSource::builder(
247 FileSystemStorage::create_with_store(loader).await?,
248 provider,
249 ))
250 }
251
252 /// Open an existing [FileSystemDataSource] using a persistent storage loader.
253 ///
254 /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
255 /// new store will be created.
256 ///
257 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
258 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
259 /// and managing synchronization of the store.
260 pub async fn open_with_store(
261 loader: &mut AtomicStoreLoader,
262 provider: P,
263 ) -> anyhow::Result<Self> {
264 Self::open_builder_with_store(loader, provider)
265 .await?
266 .build()
267 .await
268 }
269
270 /// Open an existing [FileSystemDataSource] using a persistent storage loader, and return a
271 /// builder for configuration.
272 ///
273 /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
274 /// new store will be created.
275 ///
276 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
277 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
278 /// and managing synchronization of the store.
279 pub async fn open_builder_with_store(
280 loader: &mut AtomicStoreLoader,
281 provider: P,
282 ) -> anyhow::Result<Builder<Types, P>> {
283 Ok(FileSystemDataSource::builder(
284 FileSystemStorage::open_with_store(loader).await?,
285 provider,
286 ))
287 }
288
289 /// Advance the version of the persistent store without committing changes to persistent state.
290 ///
291 /// This function is useful when the [AtomicStore](atomic_store::AtomicStore) synchronizing
292 /// storage for this [FileSystemDataSource] is being managed by the caller. The caller may want
293 /// to persist some changes to other modules whose state is managed by the same
294 /// [AtomicStore](atomic_store::AtomicStore). In order to call
295 /// [AtomicStore::commit_version](atomic_store::AtomicStore::commit_version), the version of
296 /// this [FileSystemDataSource] must be advanced, either by [commit](super::Transaction::commit)
297 /// or, if there are no outstanding changes, [skip_version](Self::skip_version).
298 pub async fn skip_version(&self) -> anyhow::Result<()> {
299 self.as_ref().skip_version().await?;
300 Ok(())
301 }
302}
303
304#[cfg(any(test, feature = "testing"))]
305mod impl_testable_data_source {
306 use async_trait::async_trait;
307 use hotshot::types::Event;
308 use tempfile::TempDir;
309
310 use super::*;
311 use crate::{
312 data_source::UpdateDataSource,
313 testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
314 };
315
316 #[async_trait]
317 impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
318 for FileSystemDataSource<MockTypes, P>
319 {
320 type Storage = TempDir;
321 type S = FileSystemStorage<MockTypes>;
322 type P = P;
323
324 async fn create(node_id: usize) -> Self::Storage {
325 TempDir::with_prefix(format!("file_system_data_source_{node_id}")).unwrap()
326 }
327
328 async fn build(
329 storage: &Self::Storage,
330 opt: impl Send + FnOnce(Builder<MockTypes, Self::P>) -> Builder<MockTypes, Self::P>,
331 ) -> Self {
332 opt(Self::open_builder(storage.path(), Default::default())
333 .await
334 .unwrap())
335 .build()
336 .await
337 .unwrap()
338 }
339
340 async fn reset(storage: &Self::Storage) -> Self {
341 Self::create(storage.path(), Default::default())
342 .await
343 .unwrap()
344 }
345
346 async fn handle_event(&self, event: &Event<MockTypes>) {
347 self.update(event).await.unwrap();
348 }
349 }
350}
351
352#[cfg(test)]
353mod test {
354 use super::FileSystemDataSource;
355 // For some reason this is the only way to import the macro defined in another module of this
356 // crate.
357 use crate::*;
358 use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
359
360 instantiate_data_source_tests!(FileSystemDataSource<MockTypes, NoFetching>);
361}