hotshot_query_service/data_source/storage/
ledger_log.rs1#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18 AppendLog, AtomicStoreLoader, PersistenceError, append_log, load_store::BincodeLoadStore,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{debug, instrument, warn};
22
23use crate::node::{ResourceSyncStatus, SyncStatus, SyncStatusRange};
24
25#[derive(Debug)]
27pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
28 cache_start: usize,
29 cache_size: usize,
30 cache: VecDeque<Option<T>>,
31 store: AppendLog<BincodeLoadStore<Option<T>>>,
32 pending_inserts: usize,
37 missing: usize,
39}
40
41impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
42 pub(crate) fn create(
43 loader: &mut AtomicStoreLoader,
44 file_pattern: &str,
45 cache_size: usize,
46 ) -> Result<Self, PersistenceError> {
47 Ok(Self {
48 cache_start: 0,
49 cache_size,
50 cache: VecDeque::with_capacity(cache_size),
51 store: AppendLog::create(
52 loader,
53 Default::default(),
54 file_pattern,
55 10u64 << 20, )?,
57 pending_inserts: 0,
58 missing: 0,
59 })
60 }
61
62 pub(crate) fn open(
63 loader: &mut AtomicStoreLoader,
64 file_pattern: &str,
65 cache_size: usize,
66 ) -> Result<Self, PersistenceError> {
67 let store = AppendLog::load(
68 loader,
69 Default::default(),
70 file_pattern,
71 1u64 << 20, )?;
73 let len = store.iter().len();
74 tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
75
76 let cache_start = len.saturating_sub(cache_size);
77 let mut missing = 0;
78 let mut cache = store
79 .iter()
80 .skip(cache_start)
81 .map(|r| {
82 if let Err(e) = &r {
83 warn!("failed to load object. Error: {}", e);
84 }
85 let obj = r.ok().flatten();
88 if obj.is_none() {
89 missing += 1;
90 }
91 obj
92 })
93 .collect::<VecDeque<_>>();
94 cache.reserve_exact(cache_size - cache.len());
95
96 Ok(Self {
97 cache_start,
98 cache_size,
99 cache,
100 store,
101 pending_inserts: 0,
102 missing,
103 })
104 }
105
106 pub(crate) fn iter(&self) -> Iter<'_, T> {
107 Iter {
108 index: 0,
109 cache_start: self.cache_start,
110 cache: &self.cache,
111 store: self.store.iter(),
112 }
113 }
114
115 pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
116 let missing = resource.is_none();
117 self.store.store_resource(&resource)?;
118 self.pending_inserts += 1;
119 if missing {
120 self.missing += 1;
121 }
122 if self.cache.len() >= self.cache_size {
123 self.cache.pop_front();
124 self.cache_start += 1;
125 }
126 self.cache.push_back(resource);
127 Ok(())
128 }
129
130 pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
135 where
136 T: Debug,
137 {
138 let len = self.iter().len();
141 let target_len = std::cmp::max(index, len);
142 for i in len..target_len {
143 debug!("storing placeholders for position {i}/{target_len}");
144 if let Err(err) = self.store_resource(None) {
145 warn!("Failed to store placeholder: {}", err);
146 return Err(err);
147 }
148 }
149 assert!(target_len >= index);
150 if target_len == index {
151 if let Err(err) = self.store_resource(Some(object)) {
153 warn!("Failed to store object at index {}: {}", index, err);
154 return Err(err);
155 }
156 Ok(true)
157 } else if matches!(self.iter().nth(index), Some(Some(_))) {
158 Ok(false)
160 } else {
161 warn!(
166 index,
167 len, target_len, "skipping out-of-order object; random inserts not yet supported"
168 );
169
170 Ok(true)
179 }
180 }
181
182 pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
183 tracing::debug!("committing new version of LedgerLog");
184 self.store.commit_version()?;
185 self.pending_inserts = 0;
186 Ok(())
187 }
188
189 pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
190 self.store.skip_version()
191 }
192
193 pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
194 self.store.revert_version()?;
195
196 for _ in 0..self.pending_inserts {
198 self.cache.pop_back();
199 }
200
201 self.pending_inserts = 0;
202 Ok(())
203 }
204
205 #[instrument(skip(self))]
210 pub(crate) fn sync_status(&self, start: usize, end: usize) -> ResourceSyncStatus {
211 let mut missing = 0;
212 let mut ranges = vec![];
213
214 let mut curr: Option<SyncStatusRange> = None;
217 for (i, obj) in self.iter().enumerate().skip(start) {
218 if obj.is_none() {
219 tracing::debug!(i, "skipping placeholder object");
220 continue;
221 }
222
223 let prev = if let Some(range) = &mut curr {
224 if i == range.end {
225 range.end += 1;
227 continue;
228 }
229
230 ranges.push(*range);
232 range.end
233 } else {
234 0
235 };
236
237 if i > prev {
238 ranges.push(SyncStatusRange {
241 start: prev,
242 end: i,
243 status: SyncStatus::Missing,
244 });
245 missing += i - prev;
246 }
247
248 curr = Some(SyncStatusRange {
250 start: i,
251 end: i + 1,
252 status: SyncStatus::Present,
253 });
254 }
255
256 if let Some(range) = curr {
258 ranges.push(range);
259 }
260
261 let prev = match ranges.last() {
263 Some(range) => range.end,
264 None => 0,
265 };
266 if prev < end {
267 ranges.push(SyncStatusRange {
268 start: prev,
269 end,
270 status: SyncStatus::Missing,
271 });
272 missing += end - prev;
273 }
274
275 ResourceSyncStatus { missing, ranges }
276 }
277}
278
279pub struct Iter<'a, T: Serialize + DeserializeOwned> {
280 index: usize,
281 cache_start: usize,
282 cache: &'a VecDeque<Option<T>>,
283 store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
284}
285
286impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
287 type Item = Option<T>;
288
289 fn next(&mut self) -> Option<Self::Item> {
290 #[allow(clippy::iter_nth_zero)]
293 self.nth(0)
294 }
295
296 fn size_hint(&self) -> (usize, Option<usize>) {
297 let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
300 (len, Some(len))
301 }
302
303 fn nth(&mut self, n: usize) -> Option<Self::Item> {
304 self.index += n;
305 let res = if self.index >= self.cache_start {
306 self.cache.get(self.index - self.cache_start).cloned()
308 } else {
309 self.store.nth(n).map(|res| {
311 if let Err(e) = &res {
312 warn!("failed to load object at position {}: error {}", n, e);
313 }
314 res.ok().flatten()
318 })
319 };
320
321 self.index += 1;
322 res
323 }
324
325 fn count(self) -> usize {
326 self.size_hint().0
327 }
328}
329
330impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
331
332#[cfg(test)]
333mod test {
334 use atomic_store::AtomicStore;
335 use tempfile::TempDir;
336
337 use super::*;
338
339 #[test_log::test(tokio::test(flavor = "multi_thread"))]
340 async fn test_ledger_log_creation() {
341 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
342
343 {
345 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
346 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
347 let mut store = AtomicStore::open(loader).unwrap();
348 for i in 0..5 {
349 log.store_resource(Some(i)).unwrap();
350 log.commit_version().await.unwrap();
351 store.commit_version().unwrap();
352 }
353 }
354
355 {
357 let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
358 let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
359 AtomicStore::open(loader).unwrap();
360 assert_eq!(
361 log.iter().collect::<Vec<_>>(),
362 (0..5).map(Some).collect::<Vec<_>>()
363 );
364 }
365 }
366
367 #[test_log::test(tokio::test(flavor = "multi_thread"))]
368 async fn test_ledger_log_insert() {
369 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
370 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
371 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
372 let mut store = AtomicStore::open(loader).unwrap();
373 assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
374
375 log.insert(0, 1).unwrap();
377 log.commit_version().await.unwrap();
378 store.commit_version().unwrap();
379 assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
380
381 log.insert(4, 2).unwrap();
383 log.commit_version().await.unwrap();
384 store.commit_version().unwrap();
385 assert_eq!(
386 log.iter().collect::<Vec<_>>(),
387 vec![Some(1), None, None, None, Some(2)]
388 );
389
390 log.insert(2, 3).unwrap();
392 log.commit_version().await.unwrap();
393 store.commit_version().unwrap();
394 log.insert(1, 4).unwrap();
403 log.commit_version().await.unwrap();
404 store.commit_version().unwrap();
405 }
408
409 #[test_log::test(tokio::test(flavor = "multi_thread"))]
410 async fn test_ledger_log_iter() {
411 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
412 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
413 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
414 let mut store = AtomicStore::open(loader).unwrap();
415 for i in 0..5 {
416 log.store_resource(Some(i)).unwrap();
417 log.commit_version().await.unwrap();
418 store.commit_version().unwrap();
419 }
420
421 assert_eq!(log.iter().len(), 5);
422 for i in 0..5 {
423 let mut iter = log.iter();
424 assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
425
426 assert_eq!(
428 iter.collect::<Vec<_>>(),
429 (i + 1..5).map(Some).collect::<Vec<_>>()
430 );
431 }
432 assert_eq!(log.iter().nth(5), None);
433 }
434}