hotshot_query_service/data_source/storage/
ledger_log.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18    AppendLog, AtomicStoreLoader, PersistenceError, append_log, load_store::BincodeLoadStore,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{debug, instrument, warn};
22
23use crate::node::{ResourceSyncStatus, SyncStatus, SyncStatusRange};
24
25/// A caching append log for ledger objects.
26#[derive(Debug)]
27pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
28    cache_start: usize,
29    cache_size: usize,
30    cache: VecDeque<Option<T>>,
31    store: AppendLog<BincodeLoadStore<Option<T>>>,
32    // Keep track of the number of appended objects which have not yet been committed. We need this
33    // to detect when we are inserting at the end of the log or in the middle, as the two casese are
34    // handled differently and `self.store.iter().len()` does not update until a new version is
35    // committed.
36    pending_inserts: usize,
37    // Track the number of missing objects, for health reporting.
38    missing: usize,
39}
40
41impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
42    pub(crate) fn create(
43        loader: &mut AtomicStoreLoader,
44        file_pattern: &str,
45        cache_size: usize,
46    ) -> Result<Self, PersistenceError> {
47        Ok(Self {
48            cache_start: 0,
49            cache_size,
50            cache: VecDeque::with_capacity(cache_size),
51            store: AppendLog::create(
52                loader,
53                Default::default(),
54                file_pattern,
55                10u64 << 20, // 10 MB
56            )?,
57            pending_inserts: 0,
58            missing: 0,
59        })
60    }
61
62    pub(crate) fn open(
63        loader: &mut AtomicStoreLoader,
64        file_pattern: &str,
65        cache_size: usize,
66    ) -> Result<Self, PersistenceError> {
67        let store = AppendLog::load(
68            loader,
69            Default::default(),
70            file_pattern,
71            1u64 << 20, // 1 MB
72        )?;
73        let len = store.iter().len();
74        tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
75
76        let cache_start = len.saturating_sub(cache_size);
77        let mut missing = 0;
78        let mut cache = store
79            .iter()
80            .skip(cache_start)
81            .map(|r| {
82                if let Err(e) = &r {
83                    warn!("failed to load object. Error: {}", e);
84                }
85                // We treat missing objects and failed-to-load objects the same:
86                // if we failed to load a object, it is now missing!
87                let obj = r.ok().flatten();
88                if obj.is_none() {
89                    missing += 1;
90                }
91                obj
92            })
93            .collect::<VecDeque<_>>();
94        cache.reserve_exact(cache_size - cache.len());
95
96        Ok(Self {
97            cache_start,
98            cache_size,
99            cache,
100            store,
101            pending_inserts: 0,
102            missing,
103        })
104    }
105
106    pub(crate) fn iter(&self) -> Iter<'_, T> {
107        Iter {
108            index: 0,
109            cache_start: self.cache_start,
110            cache: &self.cache,
111            store: self.store.iter(),
112        }
113    }
114
115    pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
116        let missing = resource.is_none();
117        self.store.store_resource(&resource)?;
118        self.pending_inserts += 1;
119        if missing {
120            self.missing += 1;
121        }
122        if self.cache.len() >= self.cache_size {
123            self.cache.pop_front();
124            self.cache_start += 1;
125        }
126        self.cache.push_back(resource);
127        Ok(())
128    }
129
130    /// Insert an object at position `index`.
131    ///
132    /// Returns whether the object was newly inserted; that is, returns `false` if and only if there
133    /// was already an object present at this index.
134    pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
135    where
136        T: Debug,
137    {
138        // If there are missing objects between what we currently have and `object`, pad with
139        // placeholders.
140        let len = self.iter().len();
141        let target_len = std::cmp::max(index, len);
142        for i in len..target_len {
143            debug!("storing placeholders for position {i}/{target_len}");
144            if let Err(err) = self.store_resource(None) {
145                warn!("Failed to store placeholder: {}", err);
146                return Err(err);
147            }
148        }
149        assert!(target_len >= index);
150        if target_len == index {
151            // This is the next object in the chain, append it to the log.
152            if let Err(err) = self.store_resource(Some(object)) {
153                warn!("Failed to store object at index {}: {}", index, err);
154                return Err(err);
155            }
156            Ok(true)
157        } else if matches!(self.iter().nth(index), Some(Some(_))) {
158            // This is a duplicate, we don't have to insert anything.
159            Ok(false)
160        } else {
161            // This is an object earlier in the chain that we are now receiving asynchronously.
162            // Update the placeholder with the actual contents of the object.
163            // TODO update persistent storage once AppendLog supports updates.
164            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
165            warn!(
166                index,
167                len, target_len, "skipping out-of-order object; random inserts not yet supported"
168            );
169
170            // TODO Update the object in cache if necessary. Note that we could do this now, even
171            // without support for storing the object persistently. But this makes the cache out of
172            // sync with persistent storage, and it means we have an object available that will
173            // become unavailable once it drops out of cache, which is not really what we want.
174            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
175            // if index >= self.cache_start {
176            //     self.cache[index - self.cache_start] = Some(object);
177            // }
178            Ok(true)
179        }
180    }
181
182    pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
183        tracing::debug!("committing new version of LedgerLog");
184        self.store.commit_version()?;
185        self.pending_inserts = 0;
186        Ok(())
187    }
188
189    pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
190        self.store.skip_version()
191    }
192
193    pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
194        self.store.revert_version()?;
195
196        // Remove objects which were inserted in cache but not committed to storage.
197        for _ in 0..self.pending_inserts {
198            self.cache.pop_back();
199        }
200
201        self.pending_inserts = 0;
202        Ok(())
203    }
204
205    /// Find the sync status of this resource.
206    ///
207    /// This function will find all consecutive ranges of a given [`SyncStatus`] in
208    /// `[0, block_height)`.
209    #[instrument(skip(self))]
210    pub(crate) fn sync_status(&self, start: usize, end: usize) -> ResourceSyncStatus {
211        let mut missing = 0;
212        let mut ranges = vec![];
213
214        // Iterate over all objects, finding ranges of consecutive present objects. In between each
215        // present range, we will interpolate a missing range.
216        let mut curr: Option<SyncStatusRange> = None;
217        for (i, obj) in self.iter().enumerate().skip(start) {
218            if obj.is_none() {
219                tracing::debug!(i, "skipping placeholder object");
220                continue;
221            }
222
223            let prev = if let Some(range) = &mut curr {
224                if i == range.end {
225                    // This object extends the current range of present objects.
226                    range.end += 1;
227                    continue;
228                }
229
230                // This object starts a new range of present objects. Save the previous range.
231                ranges.push(*range);
232                range.end
233            } else {
234                0
235            };
236
237            if i > prev {
238                // Insert a missing range between the previous present range and the newfound
239                // present object.
240                ranges.push(SyncStatusRange {
241                    start: prev,
242                    end: i,
243                    status: SyncStatus::Missing,
244                });
245                missing += i - prev;
246            }
247
248            // Start a new range of present objects.
249            curr = Some(SyncStatusRange {
250                start: i,
251                end: i + 1,
252                status: SyncStatus::Present,
253            });
254        }
255
256        // Save the last present range.
257        if let Some(range) = curr {
258            ranges.push(range);
259        }
260
261        // Insert a potential missing range at the end of the chain.
262        let prev = match ranges.last() {
263            Some(range) => range.end,
264            None => 0,
265        };
266        if prev < end {
267            ranges.push(SyncStatusRange {
268                start: prev,
269                end,
270                status: SyncStatus::Missing,
271            });
272            missing += end - prev;
273        }
274
275        ResourceSyncStatus { missing, ranges }
276    }
277}
278
279pub struct Iter<'a, T: Serialize + DeserializeOwned> {
280    index: usize,
281    cache_start: usize,
282    cache: &'a VecDeque<Option<T>>,
283    store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
284}
285
286impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
287    type Item = Option<T>;
288
289    fn next(&mut self) -> Option<Self::Item> {
290        // False positive: clippy suggests `self.next()` instead of `self.nth(0)`, but that would be
291        // recursive.
292        #[allow(clippy::iter_nth_zero)]
293        self.nth(0)
294    }
295
296    fn size_hint(&self) -> (usize, Option<usize>) {
297        // Include objects in cache that haven't necessarily been committed to storage yet. This is
298        // consistent with `nth`, which will yield such objects.
299        let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
300        (len, Some(len))
301    }
302
303    fn nth(&mut self, n: usize) -> Option<Self::Item> {
304        self.index += n;
305        let res = if self.index >= self.cache_start {
306            // Get the object from cache if we can.
307            self.cache.get(self.index - self.cache_start).cloned()
308        } else {
309            // Otherwise load from storage.
310            self.store.nth(n).map(|res| {
311                if let Err(e) = &res {
312                    warn!("failed to load object at position {}: error {}", n, e);
313                }
314                // Both a failed load and a successful load of `None` are treated the same: as
315                // missing data, so we yield `None`. The latter case can happen if there was a
316                // previous failed load and we marked this entry as explicitly missing.
317                res.ok().flatten()
318            })
319        };
320
321        self.index += 1;
322        res
323    }
324
325    fn count(self) -> usize {
326        self.size_hint().0
327    }
328}
329
330impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
331
332#[cfg(test)]
333mod test {
334    use atomic_store::AtomicStore;
335    use tempfile::TempDir;
336
337    use super::*;
338
339    #[test_log::test(tokio::test(flavor = "multi_thread"))]
340    async fn test_ledger_log_creation() {
341        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
342
343        // Create and populuate a log.
344        {
345            let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
346            let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
347            let mut store = AtomicStore::open(loader).unwrap();
348            for i in 0..5 {
349                log.store_resource(Some(i)).unwrap();
350                log.commit_version().await.unwrap();
351                store.commit_version().unwrap();
352            }
353        }
354
355        // Load the log from storage and check that we get the correct contents.
356        {
357            let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
358            let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
359            AtomicStore::open(loader).unwrap();
360            assert_eq!(
361                log.iter().collect::<Vec<_>>(),
362                (0..5).map(Some).collect::<Vec<_>>()
363            );
364        }
365    }
366
367    #[test_log::test(tokio::test(flavor = "multi_thread"))]
368    async fn test_ledger_log_insert() {
369        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
370        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
371        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
372        let mut store = AtomicStore::open(loader).unwrap();
373        assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
374
375        // Insert at end.
376        log.insert(0, 1).unwrap();
377        log.commit_version().await.unwrap();
378        store.commit_version().unwrap();
379        assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
380
381        // Insert past end.
382        log.insert(4, 2).unwrap();
383        log.commit_version().await.unwrap();
384        store.commit_version().unwrap();
385        assert_eq!(
386            log.iter().collect::<Vec<_>>(),
387            vec![Some(1), None, None, None, Some(2)]
388        );
389
390        // Insert in middle (in cache).
391        log.insert(2, 3).unwrap();
392        log.commit_version().await.unwrap();
393        store.commit_version().unwrap();
394        // TODO re-enable this check once AppendLog supports random access updates.
395        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
396        // assert_eq!(
397        //     log.iter().collect::<Vec<_>>(),
398        //     vec![Some(1), None, Some(3), None, Some(2)]
399        // );
400
401        // Insert in middle (out of cache).
402        log.insert(1, 4).unwrap();
403        log.commit_version().await.unwrap();
404        store.commit_version().unwrap();
405        // TODO check results once AppendLog supports random access updates.
406        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
407    }
408
409    #[test_log::test(tokio::test(flavor = "multi_thread"))]
410    async fn test_ledger_log_iter() {
411        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
412        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
413        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
414        let mut store = AtomicStore::open(loader).unwrap();
415        for i in 0..5 {
416            log.store_resource(Some(i)).unwrap();
417            log.commit_version().await.unwrap();
418            store.commit_version().unwrap();
419        }
420
421        assert_eq!(log.iter().len(), 5);
422        for i in 0..5 {
423            let mut iter = log.iter();
424            assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
425
426            // `nth` should not only have returned the `n`th element, but also advanced the iterator.
427            assert_eq!(
428                iter.collect::<Vec<_>>(),
429                (i + 1..5).map(Some).collect::<Vec<_>>()
430            );
431        }
432        assert_eq!(log.iter().nth(5), None);
433    }
434}