hotshot_query_service/data_source/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::path::Path;
16
17use atomic_store::AtomicStoreLoader;
18use hotshot_types::traits::node_implementation::NodeType;
19
20pub use super::storage::fs::Transaction;
21use super::{AvailabilityProvider, FetchingDataSource, storage::FileSystemStorage};
22use crate::{
23    Header, Payload,
24    availability::{QueryableHeader, query_data::QueryablePayload},
25    data_source::fetching,
26};
27
28/// A data source for the APIs provided in this crate, backed by the local file system.
29///
30/// Synchronization and atomicity of persisted data structures are provided via [`atomic_store`].
31/// The methods [`commit`](super::Transaction::commit), [`revert`](super::Transaction::revert), and
32/// [`skip_version`](Self::skip_version) of this type and its associated [`Transaction`] type can be
33/// used to control synchronization in the underlying [`AtomicStore`](atomic_store::AtomicStore).
34///
35/// Note that because [`AtomicStore`](atomic_store::AtomicStore) only allows changes to be made to
36/// the underlying store, a [`Transaction`] takes full control of the whole store, and does not
37/// permit concurrent readers or other transactions while in flight. This is enforced internally via
38/// a global `RwLock`, and is a significant downside of this storage implementation, compared to the
39/// more relaxed concurrency semantics of a SQL implementation.
40///
41/// # Extension and Composition
42///
43/// [`FileSystemDataSource`] is designed to be both extensible (so you can add additional state to
44/// the API modules defined in this crate) and composable (so you can use [`FileSystemDataSource`]
45/// as one component of a larger state type for an application with additional modules).
46///
47/// ## Extension
48///
49/// Adding additional, application-specific state to [`FileSystemDataSource`] is possible by
50/// wrapping it in [`ExtensibleDataSource`](super::ExtensibleDataSource):
51///
52/// ```
53/// # use hotshot_query_service::data_source::{ExtensibleDataSource, FileSystemDataSource};
54/// # use hotshot_query_service::fetching::provider::NoFetching;
55/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
56/// # use std::path::Path;
57/// # async fn doc(storage_path: &Path) -> Result<(), anyhow::Error> {
58/// type AppState = &'static str;
59///
60/// let data_source: ExtensibleDataSource<FileSystemDataSource<AppTypes, NoFetching>, AppState> =
61///     ExtensibleDataSource::new(FileSystemDataSource::create(storage_path, NoFetching).await?, "app state");
62/// # Ok(())
63/// # }
64/// ```
65///
66/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
67/// source traits as [`FileSystemDataSource`], and also provides access to the `AppState` parameter
68/// for use in API endpoint handlers. This can be used to implement an app-specific data source
69/// trait and add a new API endpoint that uses this app-specific data, as described in the
70/// [extension guide](crate#extension).
71///
72/// ## Composition
73///
74/// Composing [`FileSystemDataSource`] with other module states is in principle simple -- just
75/// create an aggregate struct containing both [`FileSystemDataSource`] and your additional module
76/// states. A complication arises from how persistent storage is managed: if other modules have
77/// their own persistent state, should the storage of [`FileSystemDataSource`] and the other modules
78/// be completely independent, or synchronized under the control of a single
79/// [`AtomicStore`](atomic_store::AtomicStore)? [`FileSystemDataSource`] supports both patterns:
80/// when you create it with [`create`](Self::create) or [`open`](Self::open), it will open its own
81/// [`AtomicStore`](atomic_store::AtomicStore) and manage the synchronization of its own storage,
82/// independent of any other persistent data it might be composed with. But when you create it with
83/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
84/// you may ask it to register its persistent data structures with an existing
85/// [`AtomicStoreLoader`]. If you register other modules' persistent data structures with the same
86/// loader, you can create one [`AtomicStore`](atomic_store::AtomicStore) that synchronizes all the
87/// persistent data. Note, though, that when you choose to use
88/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
89/// you become responsible for ensuring that calls to
90/// [`AtomicStore::commit_version`](atomic_store::AtomicStore::commit_version) alternate with calls
91/// to [`commit`](super::Transaction::commit) or [`skip_version`](Self::skip_version).
92///
93/// In the following example, we compose HotShot query service modules with other application-
94/// specific modules, using a single top-level [`AtomicStore`](atomic_store::AtomicStore) to
95/// synchronize all persistent storage.
96///
97/// ```
98/// # use atomic_store::{AtomicStore, AtomicStoreLoader};
99/// # use futures::StreamExt;
100/// # use hotshot::types::SystemContextHandle;
101/// # use hotshot_query_service::Error;
102/// # use hotshot_query_service::data_source::{
103/// #   FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource,
104/// # };
105/// # use hotshot_query_service::fetching::provider::NoFetching;
106/// # use hotshot_query_service::testing::mocks::{
107/// #   MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
108/// # };
109/// # use hotshot_example_types::node_types::TestVersions;
110/// # use std::{path::Path, sync::Arc};
111/// # use tide_disco::App;
112/// # use tokio::{spawn, sync::RwLock};
113/// # use vbs::version::StaticVersionType;
114/// struct AppState {
115///     // Top-level storage coordinator
116///     store: AtomicStore,
117///     hotshot_qs: FileSystemDataSource<AppTypes, NoFetching>,
118///     // additional state for other modules
119/// }
120///
121/// async fn init_server<Ver: StaticVersionType + 'static>(
122///     storage_path: &Path,
123///     hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
124/// ) -> anyhow::Result<App<Arc<RwLock<AppState>>, Error>> {
125///     let mut loader = AtomicStoreLoader::create(storage_path, "my_app")?; // or `open`
126///     let hotshot_qs = FileSystemDataSource::create_with_store(&mut loader, NoFetching)
127///         .await?;
128///     // Initialize storage for other modules using the same loader.
129///
130///     let store = AtomicStore::open(loader)?;
131///     let state = Arc::new(RwLock::new(AppState {
132///         store,
133///         hotshot_qs,
134///         // additional state for other modules
135///     }));
136///     let mut app = App::with_state(state.clone());
137///     // Register API modules.
138///
139///     spawn(async move {
140///         let mut events = hotshot.event_stream();
141///         while let Some(event) = events.next().await {
142///             let mut state = state.write().await;
143///             if state.hotshot_qs.update(&event).await.is_err() {
144///                 continue;
145///             }
146///
147///             // Update other modules' states based on `event`.
148///             let mut tx = state.hotshot_qs.write().await.unwrap();
149///             // Do updates
150///             tx.commit().await.unwrap();
151///
152///             // Commit or skip versions for other modules' storage.
153///             state.store.commit_version().unwrap();
154///         }
155///     });
156///
157///     Ok(app)
158/// }
159/// ```
160pub type FileSystemDataSource<Types, P> = FetchingDataSource<Types, FileSystemStorage<Types>, P>;
161
162/// Builder for configuring a [`FileSystemDataSource`].
163pub type Builder<Types, P> = fetching::Builder<Types, FileSystemStorage<Types>, P>;
164
165impl<Types: NodeType, P> FileSystemDataSource<Types, P>
166where
167    Payload<Types>: QueryablePayload<Types>,
168    Header<Types>: QueryableHeader<Types>,
169    P: AvailabilityProvider<Types>,
170{
171    /// Create a new [FileSystemDataSource] with storage at `path`.
172    ///
173    /// If there is already data at `path`, it will be archived.
174    ///
175    /// The [FileSystemDataSource] will manage its own persistence synchronization.
176    pub async fn create(path: &Path, provider: P) -> anyhow::Result<Self> {
177        Self::create_builder(path, provider).await?.build().await
178    }
179
180    /// Create a new [FileSystemDataSource] with storage at `path`, and return a builder for
181    /// configuration.
182    ///
183    /// If there is already data at `path`, it will be archived.
184    ///
185    /// The [FileSystemDataSource] will manage its own persistence synchronization.
186    pub async fn create_builder(path: &Path, provider: P) -> anyhow::Result<Builder<Types, P>> {
187        Ok(FileSystemDataSource::builder(
188            FileSystemStorage::create(path).await?,
189            provider,
190        ))
191    }
192
193    /// Open an existing [FileSystemDataSource] from storage at `path`.
194    ///
195    /// If there is no data at `path`, a new store will be created.
196    ///
197    /// The [FileSystemDataSource] will manage its own persistence synchronization.
198    pub async fn open(path: &Path, provider: P) -> anyhow::Result<Self> {
199        Self::open_builder(path, provider).await?.build().await
200    }
201
202    /// Open an existing [FileSystemDataSource] from storage at `path`, and return a builder for
203    /// configuration.
204    ///
205    /// If there is no data at `path`, a new store will be created.
206    ///
207    /// The [FileSystemDataSource] will manage its own persistence synchronization.
208    pub async fn open_builder(path: &Path, provider: P) -> anyhow::Result<Builder<Types, P>> {
209        Ok(FileSystemDataSource::builder(
210            FileSystemStorage::open(path).await?,
211            provider,
212        ))
213    }
214
215    /// Create a new [FileSystemDataSource] using a persistent storage loader.
216    ///
217    /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
218    /// will be archived.
219    ///
220    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
221    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
222    /// and managing synchronization of the store.
223    pub async fn create_with_store(
224        loader: &mut AtomicStoreLoader,
225        provider: P,
226    ) -> anyhow::Result<Self> {
227        Self::create_builder_with_store(loader, provider)
228            .await?
229            .build()
230            .await
231    }
232
233    /// Create a new [FileSystemDataSource] using a persistent storage loader, and return a builder
234    /// for configuration.
235    ///
236    /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
237    /// will be archived.
238    ///
239    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
240    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
241    /// and managing synchronization of the store.
242    pub async fn create_builder_with_store(
243        loader: &mut AtomicStoreLoader,
244        provider: P,
245    ) -> anyhow::Result<Builder<Types, P>> {
246        Ok(FileSystemDataSource::builder(
247            FileSystemStorage::create_with_store(loader).await?,
248            provider,
249        ))
250    }
251
252    /// Open an existing [FileSystemDataSource] using a persistent storage loader.
253    ///
254    /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
255    /// new store will be created.
256    ///
257    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
258    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
259    /// and managing synchronization of the store.
260    pub async fn open_with_store(
261        loader: &mut AtomicStoreLoader,
262        provider: P,
263    ) -> anyhow::Result<Self> {
264        Self::open_builder_with_store(loader, provider)
265            .await?
266            .build()
267            .await
268    }
269
270    /// Open an existing [FileSystemDataSource] using a persistent storage loader, and return a
271    /// builder for configuration.
272    ///
273    /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
274    /// new store will be created.
275    ///
276    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
277    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
278    /// and managing synchronization of the store.
279    pub async fn open_builder_with_store(
280        loader: &mut AtomicStoreLoader,
281        provider: P,
282    ) -> anyhow::Result<Builder<Types, P>> {
283        Ok(FileSystemDataSource::builder(
284            FileSystemStorage::open_with_store(loader).await?,
285            provider,
286        ))
287    }
288
289    /// Advance the version of the persistent store without committing changes to persistent state.
290    ///
291    /// This function is useful when the [AtomicStore](atomic_store::AtomicStore) synchronizing
292    /// storage for this [FileSystemDataSource] is being managed by the caller. The caller may want
293    /// to persist some changes to other modules whose state is managed by the same
294    /// [AtomicStore](atomic_store::AtomicStore). In order to call
295    /// [AtomicStore::commit_version](atomic_store::AtomicStore::commit_version), the version of
296    /// this [FileSystemDataSource] must be advanced, either by [commit](super::Transaction::commit)
297    /// or, if there are no outstanding changes, [skip_version](Self::skip_version).
298    pub async fn skip_version(&self) -> anyhow::Result<()> {
299        self.as_ref().skip_version().await?;
300        Ok(())
301    }
302}
303
304#[cfg(any(test, feature = "testing"))]
305mod impl_testable_data_source {
306    use async_trait::async_trait;
307    use hotshot::types::Event;
308    use tempfile::TempDir;
309
310    use super::*;
311    use crate::{
312        data_source::UpdateDataSource,
313        testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
314    };
315
316    #[async_trait]
317    impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
318        for FileSystemDataSource<MockTypes, P>
319    {
320        type Storage = TempDir;
321        type S = FileSystemStorage<MockTypes>;
322        type P = P;
323
324        async fn create(node_id: usize) -> Self::Storage {
325            TempDir::with_prefix(format!("file_system_data_source_{node_id}")).unwrap()
326        }
327
328        async fn build(
329            storage: &Self::Storage,
330            opt: impl Send + FnOnce(Builder<MockTypes, Self::P>) -> Builder<MockTypes, Self::P>,
331        ) -> Self {
332            opt(Self::open_builder(storage.path(), Default::default())
333                .await
334                .unwrap())
335            .build()
336            .await
337            .unwrap()
338        }
339
340        async fn reset(storage: &Self::Storage) -> Self {
341            Self::create(storage.path(), Default::default())
342                .await
343                .unwrap()
344        }
345
346        async fn handle_event(&self, event: &Event<MockTypes>) {
347            self.update(event).await.unwrap();
348        }
349    }
350}
351
352#[cfg(test)]
353mod test {
354    use super::FileSystemDataSource;
355    // For some reason this is the only way to import the macro defined in another module of this
356    // crate.
357    use crate::*;
358    use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
359
360    instantiate_data_source_tests!(FileSystemDataSource<MockTypes, NoFetching>);
361}