hotshot_query_service/data_source/storage/sql/queries/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Node storage implementation for a database query engine.
14
15use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22    data::VidShare,
23    simple_certificate::CertificatePair,
24    traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30    super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31    DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34    Header, MissingSnafu, QueryError, QueryResult,
35    availability::{NamespaceId, QueryableHeader},
36    data_source::storage::{
37        Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38    },
39    node::{
40        BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41        TimeWindowQueryData, WindowStart,
42    },
43    types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49    Mode: TransactionMode,
50    Types: NodeType,
51    Header<Types>: QueryableHeader<Types>,
52{
53    async fn block_height(&mut self) -> QueryResult<usize> {
54        match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55            .fetch_one(self.as_mut())
56            .await?
57        {
58            (Some(height),) => {
59                // The height of the block is the number of blocks below it, so the total number of
60                // blocks is one more than the height of the highest block.
61                Ok(height as usize + 1)
62            },
63            (None,) => {
64                // If there are no blocks yet, the height is 0.
65                Ok(0)
66            },
67        }
68    }
69
70    async fn count_transactions_in_range(
71        &mut self,
72        range: impl RangeBounds<usize> + Send,
73        namespace: Option<NamespaceId<Types>>,
74    ) -> QueryResult<usize> {
75        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77            return Ok(0);
78        };
79        let (count,) = query_as::<(i64,)>(
80            "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81        )
82        .bind(to as i64)
83        .bind(namespace)
84        .fetch_one(self.as_mut())
85        .await?;
86        let mut count = count as usize;
87
88        if from > 0 {
89            let (prev_count,) = query_as::<(i64,)>(
90                "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91            )
92            .bind((from - 1) as i64)
93            .bind(namespace)
94            .fetch_one(self.as_mut())
95            .await?;
96            count = count.saturating_sub(prev_count as usize);
97        }
98
99        Ok(count)
100    }
101
102    async fn payload_size_in_range(
103        &mut self,
104        range: impl RangeBounds<usize> + Send,
105        namespace: Option<NamespaceId<Types>>,
106    ) -> QueryResult<usize> {
107        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109            return Ok(0);
110        };
111        let (size,) = query_as::<(i64,)>(
112            "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113        )
114        .bind(to as i64)
115        .bind(namespace)
116        .fetch_one(self.as_mut())
117        .await?;
118        let mut size = size as usize;
119
120        if from > 0 {
121            let (prev_size,) = query_as::<(i64,)>(
122                "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123            )
124            .bind((from - 1) as i64)
125            .bind(namespace)
126            .fetch_one(self.as_mut())
127            .await?;
128            size = size.saturating_sub(prev_size as usize);
129        }
130
131        Ok(size)
132    }
133
134    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135    where
136        ID: Into<BlockId<Types>> + Send + Sync,
137    {
138        let mut query = QueryBuilder::default();
139        let where_clause = query.header_where_clause(id.into())?;
140        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
141        // selecting by payload ID, as payloads are not unique), we return the first one.
142        let sql = format!(
143            "SELECT vid_share FROM header AS h
144              WHERE {where_clause}
145              ORDER BY h.height
146              LIMIT 1"
147        );
148        let (share_data,) = query
149            .query_as::<(Option<Vec<u8>>,)>(&sql)
150            .fetch_one(self.as_mut())
151            .await?;
152        let share_data = share_data.context(MissingSnafu)?;
153        let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154        Ok(share)
155    }
156
157    async fn sync_status_for_range(
158        &mut self,
159        from: usize,
160        to: usize,
161    ) -> QueryResult<SyncStatusQueryData> {
162        // A block can be missing if its corresponding header is missing or if the block's pyaload
163        // information is missing.
164        let blocks = self
165            .sync_status_ranges(
166                "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167                 p.ns_table)",
168                "height",
169                from,
170                to,
171            )
172            .await?;
173
174        let leaves = if blocks.is_fully_synced() {
175            // A common special case is that there are no missing blocks. In this case, we already
176            // know there are no missing leaves either, since a block can only be present if we
177            // already have the corresponding leaf. Just return the fully-synced status for leaves
178            // without doing another expensive query.
179            blocks.clone()
180        } else {
181            // A leaf can only be missing if there is no row for it in the database (all its columns
182            // are non-nullable). We use `height` as an indicator for `NULL` rows in an inner join,
183            // which allows an index-only scan.
184            self.sync_status_ranges("leaf2", "height", from, to).await?
185        };
186
187        // VID common works just like blocks.
188        let vid_common = self
189            .sync_status_ranges(
190                "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191                "height",
192                from,
193                to,
194            )
195            .await?;
196
197        Ok(SyncStatusQueryData {
198            leaves,
199            blocks,
200            vid_common,
201            pruned_height: None,
202        })
203    }
204
205    async fn get_header_window(
206        &mut self,
207        start: impl Into<WindowStart<Types>> + Send + Sync,
208        end: u64,
209        limit: usize,
210    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
211        // Find the specific block that starts the requested window.
212        let first_block = match start.into() {
213            WindowStart::Time(t) => {
214                // If the request is not to start from a specific block, but from a timestamp, we
215                // use a different method to find the window, as detecting whether we have
216                // sufficient data to answer the query is not as simple as just trying `load_header`
217                // for a specific block ID.
218                return self.time_window::<Types>(t, end, limit).await;
219            },
220            WindowStart::Height(h) => h,
221            WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
222        };
223
224        // Find all blocks starting from `first_block` with timestamps less than `end`. Block
225        // timestamps are monotonically increasing, so this query is guaranteed to return a
226        // contiguous range of blocks ordered by increasing height.
227        let sql = format!(
228            "SELECT {HEADER_COLUMNS}
229               FROM header AS h
230              WHERE h.height >= $1 AND h.timestamp < $2
231              ORDER BY h.height
232              LIMIT $3"
233        );
234        let rows = query(&sql)
235            .bind(first_block as i64)
236            .bind(end as i64)
237            .bind(limit as i64)
238            .fetch(self.as_mut());
239        let window = rows
240            .map(|row| parse_header::<Types>(row?))
241            .try_collect::<Vec<_>>()
242            .await?;
243
244        // Find the block just before the window.
245        let prev = if first_block > 0 {
246            Some(self.load_header::<Types>(first_block as usize - 1).await?)
247        } else {
248            None
249        };
250
251        let next = if window.len() < limit {
252            // If we are not limited, complete the window by finding the block just after the
253            // window. We order by timestamp _then_ height, because the timestamp order allows the
254            // query planner to use the index on timestamp to also efficiently solve the WHERE
255            // clause, but this process may turn up multiple results, due to the 1-second resolution
256            // of block timestamps. The final sort by height guarantees us a unique, deterministic
257            // result (the first block with a given timestamp). This sort may not be able to use an
258            // index, but it shouldn't be too expensive, since there will never be more than a
259            // handful of blocks with the same timestamp.
260            let sql = format!(
261                "SELECT {HEADER_COLUMNS}
262               FROM header AS h
263              WHERE h.timestamp >= $1
264              ORDER BY h.timestamp, h.height
265              LIMIT 1"
266            );
267            query(&sql)
268                .bind(end as i64)
269                .fetch_optional(self.as_mut())
270                .await?
271                .map(parse_header::<Types>)
272                .transpose()?
273        } else {
274            // If we have been limited, return a `null` next block indicating an incomplete window.
275            // The client will have to query again with an adjusted starting point to get subsequent
276            // results.
277            tracing::debug!(limit, "cutting off header window request due to limit");
278            None
279        };
280
281        Ok(TimeWindowQueryData { window, prev, next })
282    }
283
284    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
285        let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
286            .fetch_optional(self.as_mut())
287            .await?
288        else {
289            return Ok(None);
290        };
291        let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
292        Ok(qcs)
293    }
294}
295
296impl<Mode> Transaction<Mode>
297where
298    Mode: TransactionMode,
299{
300    /// Characterize consecutive ranges of objects in the given `height`-indexed table by status.
301    ///
302    /// This function will find all ranges in `[0, block_height)`. If `pruned_height` is specified,
303    /// an initial range will be created with bounds `[0, pruned_height]` and status
304    /// [`SyncStatus::Pruned`]. Then only the range `[pruned_height + 1, block_height)` will
305    /// actually be searched.
306    ///
307    /// The search process uses an indexed outer self-join on `table`, which requires traversing
308    /// the table twice. Thus, it can be fairly expensive on large tables, but it is still linear in
309    /// the size of the table.
310    ///
311    /// The value of `indicator_column` in the outer join results is used to check for missing
312    /// objects (indicated by a `NULL` value). If `indicator_column` is a `NOT NULL` column, such as
313    /// `height`, then this function will only consider objects missing if there is no corresponding
314    /// row in the database at all. However, `indicator_column` may also be a nullable column (such
315    /// as `payload.data`, in which case objects are treated as missing if there is no corresponding
316    /// row _or_ if there is a row but it has an explicit `NULL` value for `indicator_column`).
317    #[instrument(skip(self))]
318    async fn sync_status_ranges(
319        &mut self,
320        table: &str,
321        indicator_column: &str,
322        start: usize,
323        end: usize,
324    ) -> QueryResult<ResourceSyncStatus> {
325        let mut ranges = vec![];
326        tracing::debug!("searching for missing ranges");
327
328        // Find every height in the range `[start, end)` which is the first height in a sequence of
329        // present objects (i.e. the object just before it is missing).
330        //
331        // We do this by outer joining the table with itself, with height shifted by one, to get a
332        // combined table where each row contains a successor object and its immediate predecessor,
333        // if present. If the predecessor is missing, its height will be `NULL` (which is impossible
334        // otherwise, because `height` is a `NOT NULL` column).
335        //
336        // For each table in the self-join, we _first_ sub-select just the range of interest (i.e.
337        // [start, end) for the successor table and [start - 1, end - 1) for the predecessor table).
338        // It is more efficient to do this first to reduce the number of rows involved in the join,
339        // which is the expensive part of the operation. In fact, due to the nature of the outer
340        // join, it is impossible to do this filtering after the join for the predecessor table,
341        // since at that point the table will not necessarily be indexed and will contain some rows
342        // with `NULL` height.
343        let query = format!(
344            "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
345                WHERE height >= $1 AND height < $2)
346            SELECT successor.height FROM range AS predecessor
347            RIGHT JOIN range AS successor
348            ON successor.height = predecessor.height + 1
349            WHERE successor.indicator IS NOT NULL
350              AND predecessor.indicator IS NULL
351            ORDER BY successor.height"
352        );
353        let range_starts = query_as::<(i64,)>(&query)
354            .bind(start as i64)
355            .bind(end as i64)
356            .fetch_all(self.as_mut())
357            .await?;
358        tracing::debug!(
359            ?range_starts,
360            "found {} starting heights for present ranges",
361            range_starts.len()
362        );
363
364        let range_ends = if range_starts.len() <= 10 {
365            // In the common case, where we are mostly or entirely synced and only missing a few
366            // objects, chopping the space into only a small number of present ranges, it is more
367            // efficient to pick out reach range's end individually with a specific, efficient,
368            // height-indexed query, rather than execute another very expensive query which is the
369            // mirror of the `range_starts` query to load all the range ends in bulk.
370            let mut ends = vec![];
371            for (i, &(start,)) in range_starts.iter().enumerate() {
372                // We can easily find the end of the range from the start by finding the maximum
373                // height which is still present between the start and the next range's start.
374                let query = format!(
375                    "SELECT max(height) from {table}
376                      WHERE height < $1 AND {indicator_column} IS NOT NULL"
377                );
378                let upper_bound = if i + 1 < range_starts.len() {
379                    range_starts[i + 1].0
380                } else {
381                    end as i64
382                };
383                let (end,) = query_as::<(i64,)>(&query)
384                    .bind(upper_bound)
385                    // This query is guaranteed to return one result, since `start` satisfies the
386                    // requirements even if nothing else does.
387                    .fetch_one(self.as_mut())
388                    .await?;
389                tracing::debug!(start, end, "found end for present range");
390                ends.push((end,));
391            }
392            ends
393        } else {
394            // When the number of distinct ranges becomes large, making many small queries to fetch
395            // each specific range end becomes inefficient because it is dominated by overhead. In
396            // this case, we fall back to fetching the range ends using a single moderately
397            // expensive query, which is the mirror image of the query we used to fetch the range
398            // starts.
399            let query = format!(
400                "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
401                    WHERE height >= $1 AND height < $2)
402                SELECT predecessor.height FROM range AS predecessor
403                LEFT JOIN range AS successor
404                ON successor.height = predecessor.height + 1
405                WHERE predecessor.indicator IS NOT NULL
406                  AND successor.indicator IS NULL
407                ORDER BY predecessor.height"
408            );
409            let ends = query_as::<(i64,)>(&query)
410                .bind(start as i64)
411                .bind(end as i64)
412                .fetch_all(self.as_mut())
413                .await?;
414            tracing::debug!(
415                ?ends,
416                "found {} ending heights for present ranges",
417                ends.len()
418            );
419            ends
420        };
421
422        // Sanity check: every range has a start and an end.
423        if range_starts.len() != range_ends.len() {
424            return Err(QueryError::Error {
425                message: format!(
426                    "number of present range starts ({}) does not match number of present range \
427                     ends ({})",
428                    range_starts.len(),
429                    range_ends.len(),
430                ),
431            });
432        }
433
434        // Now we can simply zip `range_starts` and `range_ends` to find the full sequence of
435        // [`SyncStatus::Present`] ranges. We can then interpolate [`SyncStatus::Missing`] ranges
436        // between each present range.
437        let mut prev = start;
438        for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
439            let start = start as usize;
440            let end = end as usize;
441
442            // Sanity check range bounds.
443            if start < prev {
444                return Err(QueryError::Error {
445                    message: format!(
446                        "found present ranges out of order: range start {start} is before \
447                         previous range end {prev}"
448                    ),
449                });
450            }
451            if end < start {
452                return Err(QueryError::Error {
453                    message: format!("malformed range: start={start}, end={end}"),
454                });
455            }
456
457            if start != prev {
458                // There is a range in between this one and the previous one, which must correspond
459                // to missing objects.
460                tracing::debug!(start = prev, end = start, "found missing range");
461                ranges.push(SyncStatusRange {
462                    start: prev,
463                    end: start,
464                    status: SyncStatus::Missing,
465                });
466            }
467
468            ranges.push(SyncStatusRange {
469                start,
470                end: end + 1, // convert inclusive range to exclusive
471                status: SyncStatus::Present,
472            });
473            prev = end + 1;
474        }
475
476        // There is possibly one more missing range, between the final present range and the overall
477        // block height.
478        if prev != end {
479            tracing::debug!(start = prev, end, "found missing range");
480            ranges.push(SyncStatusRange {
481                start: prev,
482                end,
483                status: SyncStatus::Missing,
484            });
485        }
486
487        let missing = ranges
488            .iter()
489            .filter_map(|range| {
490                if range.status == SyncStatus::Missing {
491                    Some(range.end - range.start)
492                } else {
493                    None
494                }
495            })
496            .sum();
497        tracing::debug!(
498            missing,
499            "found missing objects in {} total ranges",
500            ranges.len()
501        );
502
503        Ok(ResourceSyncStatus { missing, ranges })
504    }
505}
506
507impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
508where
509    Types: NodeType,
510    Header<Types>: QueryableHeader<Types>,
511{
512    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
513        let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
514            .fetch_one(self.as_mut())
515            .await?;
516        Ok(height as usize)
517    }
518
519    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
520        // Get the maximum height for which we have stored aggregated results
521        // then query all the namespace info for that height
522        let res: (Option<i64>,) =
523            query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
524                .fetch_one(self.as_mut())
525                .await?;
526
527        let (Some(max_height),) = res else {
528            return Ok(None);
529        };
530
531        let rows: Vec<(i64, i64, i64)> = query_as(
532            r#"
533        SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
534        "#,
535        )
536        .bind(max_height)
537        .fetch_all(self.as_mut())
538        .await?;
539
540        let mut num_transactions = HashMap::default();
541        let mut payload_size = HashMap::default();
542
543        for (namespace_id, num_tx, payload_sz) in rows {
544            // Null namespace is represented as - 1 in database
545            // as it is part of primary key and primary key can not be NULL
546            // This namespace represents the cumulative sum of all the namespaces
547            let key = if namespace_id == -1 {
548                None
549            } else {
550                Some(namespace_id.into())
551            };
552            num_transactions.insert(key, num_tx as usize);
553            payload_size.insert(key, payload_sz as usize);
554        }
555
556        Ok(Some(Aggregate {
557            height: max_height,
558            num_transactions,
559            payload_size,
560        }))
561    }
562}
563
564impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
565where
566    Header<Types>: QueryableHeader<Types>,
567{
568    async fn update_aggregates(
569        &mut self,
570        prev: Aggregate<Types>,
571        blocks: &[PayloadMetadata<Types>],
572    ) -> anyhow::Result<Aggregate<Types>> {
573        let height = blocks[0].height();
574        let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
575
576        let mut rows = Vec::new();
577
578        // Cumulatively sum up new statistics for each block in this chunk.
579        let aggregates = blocks
580            .iter()
581            .scan(
582                (height, prev_tx_count, prev_size),
583                |(height, tx_count, size), block| {
584                    if *height != block.height {
585                        return Some(Err(anyhow!(
586                            "blocks in update_aggregates are not sequential; expected {}, got {}",
587                            *height,
588                            block.height()
589                        )));
590                    }
591                    *height += 1;
592
593                    //  Update total global stats
594                    // `None` represents stats across all namespaces.
595                    // It is represented as -1 in database
596
597                    *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
598                    *size.entry(None).or_insert(0) += block.size as usize;
599
600                    // Add row for global cumulative stats (namespace = -1)
601
602                    rows.push((
603                        block.height as i64,
604                        -1,
605                        tx_count[&None] as i64,
606                        size[&None] as i64,
607                    ));
608
609                    // Update per-namespace cumulative stats
610                    for (&ns_id, info) in &block.namespaces {
611                        let key = Some(ns_id);
612
613                        *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
614                        *size.entry(key).or_insert(0) += info.size as usize;
615                    }
616
617                    //  Insert cumulative stats for all known namespaces
618                    // Even if a namespace wasn't present in this block,
619                    // we still insert its latest cumulative stats at this height.
620                    for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
621                        let key = Some(*ns_id);
622                        rows.push((
623                            block.height as i64,
624                            (*ns_id).into(),
625                            tx_count[&key] as i64,
626                            size[&key] as i64,
627                        ));
628                    }
629
630                    Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
631                },
632            )
633            .collect::<anyhow::Result<Vec<_>>>()?;
634        let last_aggregate = aggregates.last().cloned();
635
636        let (height, num_transactions, payload_size) =
637            last_aggregate.ok_or_else(|| anyhow!("no row"))?;
638
639        self.upsert(
640            "aggregate",
641            ["height", "namespace", "num_transactions", "payload_size"],
642            ["height", "namespace"],
643            rows,
644        )
645        .await?;
646        Ok(Aggregate {
647            height,
648            num_transactions,
649            payload_size,
650        })
651    }
652}
653
654impl<Mode: TransactionMode> Transaction<Mode> {
655    async fn time_window<Types: NodeType>(
656        &mut self,
657        start: u64,
658        end: u64,
659        limit: usize,
660    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
661        // Find all blocks whose timestamps fall within the window [start, end). Block timestamps
662        // are monotonically increasing, so this query is guaranteed to return a contiguous range of
663        // blocks ordered by increasing height.
664        //
665        // We order by timestamp _then_ height, because the timestamp order allows the query planner
666        // to use the index on timestamp to also efficiently solve the WHERE clause, but this
667        // process may turn up multiple results, due to the 1-second resolution of block timestamps.
668        // The final sort by height guarantees us a unique, deterministic result (the first block
669        // with a given timestamp). This sort may not be able to use an index, but it shouldn't be
670        // too expensive, since there will never be more than a handful of blocks with the same
671        // timestamp.
672        let sql = format!(
673            "SELECT {HEADER_COLUMNS}
674               FROM header AS h
675              WHERE h.timestamp >= $1 AND h.timestamp < $2
676              ORDER BY h.timestamp, h.height
677              LIMIT $3"
678        );
679        let rows = query(&sql)
680            .bind(start as i64)
681            .bind(end as i64)
682            .bind(limit as i64)
683            .fetch(self.as_mut());
684        let window: Vec<_> = rows
685            .map(|row| parse_header::<Types>(row?))
686            .try_collect()
687            .await?;
688
689        let next = if window.len() < limit {
690            // If we are not limited, complete the window by finding the block just after.
691            let sql = format!(
692                "SELECT {HEADER_COLUMNS}
693               FROM header AS h
694              WHERE h.timestamp >= $1
695              ORDER BY h.timestamp, h.height
696              LIMIT 1"
697            );
698            query(&sql)
699                .bind(end as i64)
700                .fetch_optional(self.as_mut())
701                .await?
702                .map(parse_header::<Types>)
703                .transpose()?
704        } else {
705            // If we have been limited, return a `null` next block indicating an incomplete window.
706            // The client will have to query again with an adjusted starting point to get subsequent
707            // results.
708            tracing::debug!(limit, "cutting off header window request due to limit");
709            None
710        };
711
712        // If the `next` block exists, _or_ if any block in the window exists, we know we have
713        // enough information to definitively say at least where the window starts (we may or may
714        // not have where it ends, depending on how many blocks have thus far been produced).
715        // However, if we have neither a block in the window nor a block after it, we cannot say
716        // whether the next block produced will have a timestamp before or after the window start.
717        // In this case, we don't know what the `prev` field of the response should be, so we return
718        // an error: the caller must try again after more blocks have been produced.
719        if window.is_empty() && next.is_none() {
720            return Err(QueryError::NotFound);
721        }
722
723        // Find the block just before the window.
724        let sql = format!(
725            "SELECT {HEADER_COLUMNS}
726               FROM header AS h
727              WHERE h.timestamp < $1
728              ORDER BY h.timestamp DESC, h.height DESC
729              LIMIT 1"
730        );
731        let prev = query(&sql)
732            .bind(start as i64)
733            .fetch_optional(self.as_mut())
734            .await?
735            .map(parse_header::<Types>)
736            .transpose()?;
737
738        Ok(TimeWindowQueryData { window, prev, next })
739    }
740}
741
742/// Get inclusive start and end bounds for a range to pull aggregate statistics.
743///
744/// Returns [`None`] if there are no blocks in the given range, in which case the result should be
745/// the default value of the aggregate statistic.
746async fn aggregate_range_bounds<Types>(
747    tx: &mut Transaction<impl TransactionMode>,
748    range: impl RangeBounds<usize>,
749) -> QueryResult<Option<(usize, usize)>>
750where
751    Types: NodeType,
752    Header<Types>: QueryableHeader<Types>,
753{
754    let from = match range.start_bound() {
755        Bound::Included(from) => *from,
756        Bound::Excluded(from) => *from + 1,
757        Bound::Unbounded => 0,
758    };
759    let to = match range.end_bound() {
760        Bound::Included(to) => *to,
761        Bound::Excluded(0) => return Ok(None),
762        Bound::Excluded(to) => *to - 1,
763        Bound::Unbounded => {
764            let height = AggregatesStorage::<Types>::aggregates_height(tx)
765                .await
766                .map_err(|err| QueryError::Error {
767                    message: format!("{err:#}"),
768                })?;
769            if height == 0 {
770                return Ok(None);
771            }
772            if height < from {
773                return Ok(None);
774            }
775            height - 1
776        },
777    };
778    Ok(Some((from, to)))
779}
780
781#[cfg(test)]
782mod test {
783    use hotshot_example_types::node_types::TEST_VERSIONS;
784    use hotshot_types::vid::advz::advz_scheme;
785    use itertools::Itertools;
786    use jf_advz::VidScheme;
787    use pretty_assertions::assert_eq;
788
789    use super::*;
790    use crate::{
791        availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
792        data_source::{
793            Transaction as _, VersionedDataSource,
794            sql::testing::TmpDb,
795            storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
796        },
797        testing::mocks::MockTypes,
798    };
799
800    async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
801        let storage = TmpDb::init().await;
802        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
803            .await
804            .unwrap();
805
806        // Generate some mock leaves to insert.
807        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
808            LeafQueryData::<MockTypes>::genesis(
809                &Default::default(),
810                &Default::default(),
811                TEST_VERSIONS.test,
812            )
813            .await,
814        ];
815        for i in 1..end {
816            let mut leaf = leaves[i - 1].clone();
817            leaf.leaf.block_header_mut().block_number = i as u64;
818            leaves.push(leaf);
819        }
820
821        // Set up.
822        {
823            let mut tx = db.write().await.unwrap();
824
825            for &(start, end) in present_ranges {
826                for leaf in &leaves[start..end] {
827                    tx.insert_leaf(leaf).await.unwrap();
828                }
829            }
830
831            tx.commit().await.unwrap();
832        }
833
834        let sync_status = db
835            .read()
836            .await
837            .unwrap()
838            .sync_status_ranges("leaf2", "height", start, end)
839            .await
840            .unwrap();
841
842        // Verify missing.
843        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
844        let total = end - start;
845        assert_eq!(sync_status.missing, total - present);
846
847        // Verify ranges.
848        let mut ranges = sync_status.ranges.into_iter();
849        let mut prev = start;
850        for &(start, end) in present_ranges {
851            if start != prev {
852                let range = ranges.next().unwrap();
853                assert_eq!(
854                    range,
855                    SyncStatusRange {
856                        start: prev,
857                        end: start,
858                        status: SyncStatus::Missing,
859                    }
860                );
861            }
862            let range = ranges.next().unwrap();
863            assert_eq!(
864                range,
865                SyncStatusRange {
866                    start,
867                    end,
868                    status: SyncStatus::Present,
869                }
870            );
871            prev = end;
872        }
873
874        if prev != end {
875            let range = ranges.next().unwrap();
876            assert_eq!(
877                range,
878                SyncStatusRange {
879                    start: prev,
880                    end,
881                    status: SyncStatus::Missing,
882                }
883            );
884        }
885
886        assert_eq!(ranges.next(), None);
887    }
888
889    #[tokio::test]
890    #[test_log::test]
891    async fn test_sync_status_ranges_bookends_present() {
892        test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
893    }
894
895    #[tokio::test]
896    #[test_log::test]
897    async fn test_sync_status_ranges_bookends_missing() {
898        test_sync_status_ranges(0, 6, &[(2, 4)]).await;
899    }
900
901    #[tokio::test]
902    #[test_log::test]
903    async fn test_sync_status_ranges_start_offset_bookends_present() {
904        test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
905    }
906
907    #[tokio::test]
908    #[test_log::test]
909    async fn test_sync_status_ranges_start_offset_bookends_missing() {
910        test_sync_status_ranges(1, 8, &[(4, 6)]).await;
911    }
912
913    #[tokio::test]
914    #[test_log::test]
915    async fn test_sync_status_ranges_singleton_ranges() {
916        test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
917    }
918
919    #[tokio::test]
920    #[test_log::test]
921    async fn test_sync_status_ranges_many_ranges_bookends_present() {
922        let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
923        test_sync_status_ranges(0, 201, &ranges).await;
924    }
925
926    #[tokio::test]
927    #[test_log::test]
928    async fn test_sync_status_ranges_many_ranges_bookends_missing() {
929        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
930        test_sync_status_ranges(0, 202, &ranges).await;
931    }
932
933    #[tokio::test]
934    #[test_log::test]
935    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
936        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
937        test_sync_status_ranges(1, 201, &ranges).await;
938    }
939
940    #[tokio::test]
941    #[test_log::test]
942    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
943        let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
944        test_sync_status_ranges(1, 202, &ranges).await;
945    }
946
947    #[tokio::test]
948    #[test_log::test]
949    async fn test_sync_status_duplicate_payload() {
950        let storage = TmpDb::init().await;
951        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
952            .await
953            .unwrap();
954        let mut vid = advz_scheme(2);
955
956        // Create two blocks with the same (empty) payload.
957        let mut leaves = vec![
958            LeafQueryData::<MockTypes>::genesis(
959                &Default::default(),
960                &Default::default(),
961                TEST_VERSIONS.test,
962            )
963            .await,
964        ];
965        let mut blocks = vec![
966            BlockQueryData::<MockTypes>::genesis(
967                &Default::default(),
968                &Default::default(),
969                TEST_VERSIONS.test.base,
970            )
971            .await,
972        ];
973        let dispersal = vid.disperse([]).unwrap();
974
975        let mut leaf = leaves[0].clone();
976        leaf.leaf.block_header_mut().block_number += 1;
977        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
978        leaves.push(leaf);
979        blocks.push(block);
980
981        // Insert the first leaf without payload or VID data.
982        {
983            let mut tx = db.write().await.unwrap();
984            tx.insert_leaf(&leaves[0]).await.unwrap();
985            tx.commit().await.unwrap();
986        }
987
988        // The block and VID data are missing.
989        let missing = ResourceSyncStatus {
990            missing: 1,
991            ranges: vec![SyncStatusRange {
992                status: SyncStatus::Missing,
993                start: 0,
994                end: 1,
995            }],
996        };
997        assert_eq!(
998            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
999                .await
1000                .unwrap(),
1001            SyncStatusQueryData {
1002                leaves: ResourceSyncStatus {
1003                    missing: 0,
1004                    ranges: vec![SyncStatusRange {
1005                        status: SyncStatus::Present,
1006                        start: 0,
1007                        end: 1,
1008                    }]
1009                },
1010                blocks: missing.clone(),
1011                vid_common: missing.clone(),
1012                pruned_height: None,
1013            }
1014        );
1015
1016        // Insert the second block with all data.
1017        {
1018            let mut tx = db.write().await.unwrap();
1019            tx.insert_leaf(&leaves[1]).await.unwrap();
1020            tx.insert_block(&blocks[1]).await.unwrap();
1021            tx.insert_vid(
1022                &VidCommonQueryData::<MockTypes>::new(
1023                    leaves[1].header().clone(),
1024                    hotshot_types::data::VidCommon::V0(dispersal.common),
1025                ),
1026                Some(&VidShare::V0(dispersal.shares[0].clone())),
1027            )
1028            .await
1029            .unwrap();
1030            tx.commit().await.unwrap();
1031        }
1032
1033        // The payload data is shared by both leaves, and nothing is missing.
1034        let present = ResourceSyncStatus {
1035            missing: 0,
1036            ranges: vec![SyncStatusRange {
1037                status: SyncStatus::Present,
1038                start: 0,
1039                end: 2,
1040            }],
1041        };
1042        assert_eq!(
1043            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1044                .await
1045                .unwrap(),
1046            SyncStatusQueryData {
1047                leaves: present.clone(),
1048                blocks: present.clone(),
1049                vid_common: present,
1050                pruned_height: None,
1051            }
1052        );
1053    }
1054
1055    #[tokio::test]
1056    #[test_log::test]
1057    async fn test_sync_status_same_payload_different_ns_table() {
1058        let storage = TmpDb::init().await;
1059        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1060            .await
1061            .unwrap();
1062        let mut vid = advz_scheme(2);
1063
1064        // Create two blocks with byte-identical payloads, but different namespace tables (meaning
1065        // the interpretation of the payload is different).
1066        // Create two blocks with the same (empty) payload.
1067        let mut leaves = vec![
1068            LeafQueryData::<MockTypes>::genesis(
1069                &Default::default(),
1070                &Default::default(),
1071                TEST_VERSIONS.test,
1072            )
1073            .await,
1074        ];
1075        let mut blocks = vec![
1076            BlockQueryData::<MockTypes>::genesis(
1077                &Default::default(),
1078                &Default::default(),
1079                TEST_VERSIONS.test.base,
1080            )
1081            .await,
1082        ];
1083        let dispersal = vid.disperse([]).unwrap();
1084
1085        let mut leaf = leaves[0].clone();
1086        leaf.leaf.block_header_mut().block_number += 1;
1087        leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1088        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1089        leaves.push(leaf);
1090        blocks.push(block);
1091
1092        // Insert the first leaf without payload or VID data.
1093        {
1094            let mut tx = db.write().await.unwrap();
1095            tx.insert_leaf(&leaves[0]).await.unwrap();
1096            tx.commit().await.unwrap();
1097        }
1098
1099        // The block and VID data are missing.
1100        let missing = ResourceSyncStatus {
1101            missing: 1,
1102            ranges: vec![SyncStatusRange {
1103                status: SyncStatus::Missing,
1104                start: 0,
1105                end: 1,
1106            }],
1107        };
1108        assert_eq!(
1109            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1110                .await
1111                .unwrap(),
1112            SyncStatusQueryData {
1113                leaves: ResourceSyncStatus {
1114                    missing: 0,
1115                    ranges: vec![SyncStatusRange {
1116                        status: SyncStatus::Present,
1117                        start: 0,
1118                        end: 1,
1119                    }]
1120                },
1121                blocks: missing.clone(),
1122                vid_common: missing.clone(),
1123                pruned_height: None,
1124            }
1125        );
1126
1127        // Insert the second block with all data.
1128        {
1129            let mut tx = db.write().await.unwrap();
1130            tx.insert_leaf(&leaves[1]).await.unwrap();
1131            tx.insert_block(&blocks[1]).await.unwrap();
1132            tx.insert_vid(
1133                &VidCommonQueryData::<MockTypes>::new(
1134                    leaves[1].header().clone(),
1135                    hotshot_types::data::VidCommon::V0(dispersal.common),
1136                ),
1137                Some(&VidShare::V0(dispersal.shares[0].clone())),
1138            )
1139            .await
1140            .unwrap();
1141            tx.commit().await.unwrap();
1142        }
1143
1144        // The payload data cannot be shared because metadata (e.g. num_transactions) differs. VID,
1145        // on the other hand, is independent of the interpretation of the payload and is shared by
1146        // both leaves.
1147        let present = ResourceSyncStatus {
1148            missing: 0,
1149            ranges: vec![SyncStatusRange {
1150                status: SyncStatus::Present,
1151                start: 0,
1152                end: 2,
1153            }],
1154        };
1155        let missing = ResourceSyncStatus {
1156            missing: 1,
1157            ranges: vec![
1158                SyncStatusRange {
1159                    status: SyncStatus::Missing,
1160                    start: 0,
1161                    end: 1,
1162                },
1163                SyncStatusRange {
1164                    status: SyncStatus::Present,
1165                    start: 1,
1166                    end: 2,
1167                },
1168            ],
1169        };
1170        assert_eq!(
1171            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1172                .await
1173                .unwrap(),
1174            SyncStatusQueryData {
1175                leaves: present.clone(),
1176                blocks: missing.clone(),
1177                vid_common: present,
1178                pruned_height: None,
1179            }
1180        );
1181    }
1182}