Skip to main content

hotshot_query_service/data_source/storage/sql/queries/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Node storage implementation for a database query engine.
14
15use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22    data::VidShare,
23    simple_certificate::CertificatePair,
24    traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30    super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31    DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34    Header, MissingSnafu, QueryError, QueryResult,
35    availability::{Certificate2, NamespaceId, QueryableHeader},
36    data_source::storage::{
37        Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38    },
39    node::{
40        BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41        TimeWindowQueryData, WindowStart,
42    },
43    types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49    Mode: TransactionMode,
50    Types: NodeType,
51    Header<Types>: QueryableHeader<Types>,
52{
53    async fn block_height(&mut self) -> QueryResult<usize> {
54        match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55            .fetch_one(self.as_mut())
56            .await?
57        {
58            (Some(height),) => {
59                // The height of the block is the number of blocks below it, so the total number of
60                // blocks is one more than the height of the highest block.
61                Ok(height as usize + 1)
62            },
63            (None,) => {
64                // If there are no blocks yet, the height is 0.
65                Ok(0)
66            },
67        }
68    }
69
70    async fn count_transactions_in_range(
71        &mut self,
72        range: impl RangeBounds<usize> + Send,
73        namespace: Option<NamespaceId<Types>>,
74    ) -> QueryResult<usize> {
75        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77            return Ok(0);
78        };
79        let (count,) = query_as::<(i64,)>(
80            "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81        )
82        .bind(to as i64)
83        .bind(namespace)
84        .fetch_one(self.as_mut())
85        .await?;
86        let mut count = count as usize;
87
88        if from > 0 {
89            let (prev_count,) = query_as::<(i64,)>(
90                "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91            )
92            .bind((from - 1) as i64)
93            .bind(namespace)
94            .fetch_one(self.as_mut())
95            .await?;
96            count = count.saturating_sub(prev_count as usize);
97        }
98
99        Ok(count)
100    }
101
102    async fn payload_size_in_range(
103        &mut self,
104        range: impl RangeBounds<usize> + Send,
105        namespace: Option<NamespaceId<Types>>,
106    ) -> QueryResult<usize> {
107        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109            return Ok(0);
110        };
111        let (size,) = query_as::<(i64,)>(
112            "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113        )
114        .bind(to as i64)
115        .bind(namespace)
116        .fetch_one(self.as_mut())
117        .await?;
118        let mut size = size as usize;
119
120        if from > 0 {
121            let (prev_size,) = query_as::<(i64,)>(
122                "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123            )
124            .bind((from - 1) as i64)
125            .bind(namespace)
126            .fetch_one(self.as_mut())
127            .await?;
128            size = size.saturating_sub(prev_size as usize);
129        }
130
131        Ok(size)
132    }
133
134    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135    where
136        ID: Into<BlockId<Types>> + Send + Sync,
137    {
138        let mut query = QueryBuilder::default();
139        let where_clause = query.header_where_clause(id.into())?;
140        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
141        // selecting by payload ID, as payloads are not unique), we return the first one.
142        let sql = format!(
143            "SELECT vid_share FROM header AS h
144              WHERE {where_clause}
145              ORDER BY h.height
146              LIMIT 1"
147        );
148        let (share_data,) = query
149            .query_as::<(Option<Vec<u8>>,)>(&sql)
150            .fetch_one(self.as_mut())
151            .await?;
152        let share_data = share_data.context(MissingSnafu)?;
153        let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154        Ok(share)
155    }
156
157    async fn sync_status_for_range(
158        &mut self,
159        from: usize,
160        to: usize,
161    ) -> QueryResult<SyncStatusQueryData> {
162        // A block can be missing if its corresponding header is missing or if the block's pyaload
163        // information is missing.
164        let blocks = self
165            .sync_status_ranges(
166                "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167                 p.ns_table)",
168                "height",
169                from,
170                to,
171            )
172            .await?;
173
174        let leaves = if blocks.is_fully_synced() {
175            // A common special case is that there are no missing blocks. In this case, we already
176            // know there are no missing leaves either, since a block can only be present if we
177            // already have the corresponding leaf. Just return the fully-synced status for leaves
178            // without doing another expensive query.
179            blocks.clone()
180        } else {
181            // A leaf can only be missing if there is no row for it in the database (all its columns
182            // are non-nullable). We use `height` as an indicator for `NULL` rows in an inner join,
183            // which allows an index-only scan.
184            self.sync_status_ranges("leaf2", "height", from, to).await?
185        };
186
187        // VID common works just like blocks.
188        let vid_common = self
189            .sync_status_ranges(
190                "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191                "height",
192                from,
193                to,
194            )
195            .await?;
196
197        Ok(SyncStatusQueryData {
198            leaves,
199            blocks,
200            vid_common,
201            pruned_height: None,
202        })
203    }
204
205    async fn get_header_window(
206        &mut self,
207        start: impl Into<WindowStart<Types>> + Send + Sync,
208        end: u64,
209        limit: usize,
210    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
211        // Find the specific block that starts the requested window.
212        let first_block = match start.into() {
213            WindowStart::Time(t) => {
214                // If the request is not to start from a specific block, but from a timestamp, we
215                // use a different method to find the window, as detecting whether we have
216                // sufficient data to answer the query is not as simple as just trying `load_header`
217                // for a specific block ID.
218                return self.time_window::<Types>(t, end, limit).await;
219            },
220            WindowStart::Height(h) => h,
221            WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
222        };
223
224        // Find all blocks starting from `first_block` with timestamps less than `end`. Block
225        // timestamps are monotonically increasing, so this query is guaranteed to return a
226        // contiguous range of blocks ordered by increasing height.
227        let sql = format!(
228            "SELECT {HEADER_COLUMNS}
229               FROM header AS h
230              WHERE h.height >= $1 AND h.timestamp < $2
231              ORDER BY h.height
232              LIMIT $3"
233        );
234        let rows = query(&sql)
235            .bind(first_block as i64)
236            .bind(end as i64)
237            .bind(limit as i64)
238            .fetch(self.as_mut());
239        let window = rows
240            .map(|row| parse_header::<Types, _>(row?))
241            .try_collect::<Vec<_>>()
242            .await?;
243
244        // Find the block just before the window.
245        let prev = if first_block > 0 {
246            Some(self.load_header::<Types>(first_block as usize - 1).await?)
247        } else {
248            None
249        };
250
251        let next = if window.len() < limit {
252            // If we are not limited, complete the window by finding the block just after the
253            // window. We order by timestamp _then_ height, because the timestamp order allows the
254            // query planner to use the index on timestamp to also efficiently solve the WHERE
255            // clause, but this process may turn up multiple results, due to the 1-second resolution
256            // of block timestamps. The final sort by height guarantees us a unique, deterministic
257            // result (the first block with a given timestamp). This sort may not be able to use an
258            // index, but it shouldn't be too expensive, since there will never be more than a
259            // handful of blocks with the same timestamp.
260            let sql = format!(
261                "SELECT {HEADER_COLUMNS}
262               FROM header AS h
263              WHERE h.timestamp >= $1
264              ORDER BY h.timestamp, h.height
265              LIMIT 1"
266            );
267            query(&sql)
268                .bind(end as i64)
269                .fetch_optional(self.as_mut())
270                .await?
271                .map(parse_header::<Types, _>)
272                .transpose()?
273        } else {
274            // If we have been limited, return a `null` next block indicating an incomplete window.
275            // The client will have to query again with an adjusted starting point to get subsequent
276            // results.
277            tracing::debug!(limit, "cutting off header window request due to limit");
278            None
279        };
280
281        Ok(TimeWindowQueryData { window, prev, next })
282    }
283
284    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
285        let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
286            .fetch_optional(self.as_mut())
287            .await?
288        else {
289            return Ok(None);
290        };
291        let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
292        Ok(qcs)
293    }
294
295    async fn load_cert2(&mut self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
296        let Some((json,)) = query_as("SELECT data FROM cert2 WHERE height = $1")
297            .bind(height as i64)
298            .fetch_optional(self.as_mut())
299            .await?
300        else {
301            return Ok(None);
302        };
303        let cert2 = serde_json::from_value(json).decode_error("malformed cert2")?;
304        Ok(cert2)
305    }
306
307    async fn load_earliest_cert2(
308        &mut self,
309        height: u64,
310    ) -> QueryResult<Option<Certificate2<Types>>> {
311        let Some((_h, json)): Option<(i64, serde_json::Value)> = query_as(
312            "SELECT height, data FROM cert2 WHERE height >= $1 ORDER BY height ASC LIMIT 1",
313        )
314        .bind(height as i64)
315        .fetch_optional(self.as_mut())
316        .await?
317        else {
318            return Ok(None);
319        };
320        let cert2 = serde_json::from_value(json).decode_error("malformed cert2")?;
321        Ok(Some(cert2))
322    }
323}
324
325impl<Mode> Transaction<Mode>
326where
327    Mode: TransactionMode,
328{
329    /// Characterize consecutive ranges of objects in the given `height`-indexed table by status.
330    ///
331    /// This function will find all ranges in `[0, block_height)`. If `pruned_height` is specified,
332    /// an initial range will be created with bounds `[0, pruned_height]` and status
333    /// [`SyncStatus::Pruned`]. Then only the range `[pruned_height + 1, block_height)` will
334    /// actually be searched.
335    ///
336    /// The search process uses an indexed outer self-join on `table`, which requires traversing
337    /// the table twice. Thus, it can be fairly expensive on large tables, but it is still linear in
338    /// the size of the table.
339    ///
340    /// The value of `indicator_column` in the outer join results is used to check for missing
341    /// objects (indicated by a `NULL` value). If `indicator_column` is a `NOT NULL` column, such as
342    /// `height`, then this function will only consider objects missing if there is no corresponding
343    /// row in the database at all. However, `indicator_column` may also be a nullable column (such
344    /// as `payload.data`, in which case objects are treated as missing if there is no corresponding
345    /// row _or_ if there is a row but it has an explicit `NULL` value for `indicator_column`).
346    #[instrument(skip(self))]
347    async fn sync_status_ranges(
348        &mut self,
349        table: &str,
350        indicator_column: &str,
351        start: usize,
352        end: usize,
353    ) -> QueryResult<ResourceSyncStatus> {
354        let mut ranges = vec![];
355        tracing::debug!("searching for missing ranges");
356
357        // Find every height in the range `[start, end)` which is the first height in a sequence of
358        // present objects (i.e. the object just before it is missing).
359        //
360        // We do this by outer joining the table with itself, with height shifted by one, to get a
361        // combined table where each row contains a successor object and its immediate predecessor,
362        // if present. If the predecessor is missing, its height will be `NULL` (which is impossible
363        // otherwise, because `height` is a `NOT NULL` column).
364        //
365        // For each table in the self-join, we _first_ sub-select just the range of interest (i.e.
366        // [start, end) for the successor table and [start - 1, end - 1) for the predecessor table).
367        // It is more efficient to do this first to reduce the number of rows involved in the join,
368        // which is the expensive part of the operation. In fact, due to the nature of the outer
369        // join, it is impossible to do this filtering after the join for the predecessor table,
370        // since at that point the table will not necessarily be indexed and will contain some rows
371        // with `NULL` height.
372        let query = format!(
373            "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
374                WHERE height >= $1 AND height < $2)
375            SELECT successor.height FROM range AS predecessor
376            RIGHT JOIN range AS successor
377            ON successor.height = predecessor.height + 1
378            WHERE successor.indicator IS NOT NULL
379              AND predecessor.indicator IS NULL
380            ORDER BY successor.height"
381        );
382        let range_starts = query_as::<(i64,)>(&query)
383            .bind(start as i64)
384            .bind(end as i64)
385            .fetch_all(self.as_mut())
386            .await?;
387        tracing::debug!(
388            ?range_starts,
389            "found {} starting heights for present ranges",
390            range_starts.len()
391        );
392
393        let range_ends = if range_starts.len() <= 10 {
394            // In the common case, where we are mostly or entirely synced and only missing a few
395            // objects, chopping the space into only a small number of present ranges, it is more
396            // efficient to pick out reach range's end individually with a specific, efficient,
397            // height-indexed query, rather than execute another very expensive query which is the
398            // mirror of the `range_starts` query to load all the range ends in bulk.
399            let mut ends = vec![];
400            for (i, &(start,)) in range_starts.iter().enumerate() {
401                // We can easily find the end of the range from the start by finding the maximum
402                // height which is still present between the start and the next range's start.
403                let query = format!(
404                    "SELECT height FROM {table}
405                      WHERE height < $1 AND {indicator_column} IS NOT NULL
406                      ORDER BY height DESC
407                      LIMIT 1"
408                );
409                let upper_bound = if i + 1 < range_starts.len() {
410                    range_starts[i + 1].0
411                } else {
412                    end as i64
413                };
414                let (end,) = query_as::<(i64,)>(&query)
415                    .bind(upper_bound)
416                    // This query is guaranteed to return one result, since `start` satisfies the
417                    // requirements even if nothing else does.
418                    .fetch_one(self.as_mut())
419                    .await?;
420                tracing::debug!(start, end, "found end for present range");
421                ends.push((end,));
422            }
423            ends
424        } else {
425            // When the number of distinct ranges becomes large, making many small queries to fetch
426            // each specific range end becomes inefficient because it is dominated by overhead. In
427            // this case, we fall back to fetching the range ends using a single moderately
428            // expensive query, which is the mirror image of the query we used to fetch the range
429            // starts.
430            let query = format!(
431                "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
432                    WHERE height >= $1 AND height < $2)
433                SELECT predecessor.height FROM range AS predecessor
434                LEFT JOIN range AS successor
435                ON successor.height = predecessor.height + 1
436                WHERE predecessor.indicator IS NOT NULL
437                  AND successor.indicator IS NULL
438                ORDER BY predecessor.height"
439            );
440            let ends = query_as::<(i64,)>(&query)
441                .bind(start as i64)
442                .bind(end as i64)
443                .fetch_all(self.as_mut())
444                .await?;
445            tracing::debug!(
446                ?ends,
447                "found {} ending heights for present ranges",
448                ends.len()
449            );
450            ends
451        };
452
453        // Sanity check: every range has a start and an end.
454        if range_starts.len() != range_ends.len() {
455            return Err(QueryError::Error {
456                message: format!(
457                    "number of present range starts ({}) does not match number of present range \
458                     ends ({})",
459                    range_starts.len(),
460                    range_ends.len(),
461                ),
462            });
463        }
464
465        // Now we can simply zip `range_starts` and `range_ends` to find the full sequence of
466        // [`SyncStatus::Present`] ranges. We can then interpolate [`SyncStatus::Missing`] ranges
467        // between each present range.
468        let mut prev = start;
469        for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
470            let start = start as usize;
471            let end = end as usize;
472
473            // Sanity check range bounds.
474            if start < prev {
475                return Err(QueryError::Error {
476                    message: format!(
477                        "found present ranges out of order: range start {start} is before \
478                         previous range end {prev}"
479                    ),
480                });
481            }
482            if end < start {
483                return Err(QueryError::Error {
484                    message: format!("malformed range: start={start}, end={end}"),
485                });
486            }
487
488            if start != prev {
489                // There is a range in between this one and the previous one, which must correspond
490                // to missing objects.
491                tracing::debug!(start = prev, end = start, "found missing range");
492                ranges.push(SyncStatusRange {
493                    start: prev,
494                    end: start,
495                    status: SyncStatus::Missing,
496                });
497            }
498
499            ranges.push(SyncStatusRange {
500                start,
501                end: end + 1, // convert inclusive range to exclusive
502                status: SyncStatus::Present,
503            });
504            prev = end + 1;
505        }
506
507        // There is possibly one more missing range, between the final present range and the overall
508        // block height.
509        if prev != end {
510            tracing::debug!(start = prev, end, "found missing range");
511            ranges.push(SyncStatusRange {
512                start: prev,
513                end,
514                status: SyncStatus::Missing,
515            });
516        }
517
518        let missing = ranges
519            .iter()
520            .filter_map(|range| {
521                if range.status == SyncStatus::Missing {
522                    Some(range.end - range.start)
523                } else {
524                    None
525                }
526            })
527            .sum();
528        tracing::debug!(
529            missing,
530            "found missing objects in {} total ranges",
531            ranges.len()
532        );
533
534        Ok(ResourceSyncStatus { missing, ranges })
535    }
536}
537
538impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
539where
540    Types: NodeType,
541    Header<Types>: QueryableHeader<Types>,
542{
543    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
544        let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
545            .fetch_one(self.as_mut())
546            .await?;
547        Ok(height as usize)
548    }
549
550    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
551        // Get the maximum height for which we have stored aggregated results
552        // then query all the namespace info for that height
553        let res: (Option<i64>,) =
554            query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
555                .fetch_one(self.as_mut())
556                .await?;
557
558        let (Some(max_height),) = res else {
559            return Ok(None);
560        };
561
562        let rows: Vec<(i64, i64, i64)> = query_as(
563            r#"
564        SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
565        "#,
566        )
567        .bind(max_height)
568        .fetch_all(self.as_mut())
569        .await?;
570
571        let mut num_transactions = HashMap::default();
572        let mut payload_size = HashMap::default();
573
574        for (namespace_id, num_tx, payload_sz) in rows {
575            // Null namespace is represented as - 1 in database
576            // as it is part of primary key and primary key can not be NULL
577            // This namespace represents the cumulative sum of all the namespaces
578            let key = if namespace_id == -1 {
579                None
580            } else {
581                Some(namespace_id.into())
582            };
583            num_transactions.insert(key, num_tx as usize);
584            payload_size.insert(key, payload_sz as usize);
585        }
586
587        Ok(Some(Aggregate {
588            height: max_height,
589            num_transactions,
590            payload_size,
591        }))
592    }
593}
594
595impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
596where
597    Header<Types>: QueryableHeader<Types>,
598{
599    async fn update_aggregates(
600        &mut self,
601        prev: Aggregate<Types>,
602        blocks: &[PayloadMetadata<Types>],
603    ) -> anyhow::Result<Aggregate<Types>> {
604        let height = blocks[0].height();
605        let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
606
607        let mut rows = Vec::new();
608
609        // Cumulatively sum up new statistics for each block in this chunk.
610        let aggregates = blocks
611            .iter()
612            .scan(
613                (height, prev_tx_count, prev_size),
614                |(height, tx_count, size), block| {
615                    if *height != block.height {
616                        return Some(Err(anyhow!(
617                            "blocks in update_aggregates are not sequential; expected {}, got {}",
618                            *height,
619                            block.height()
620                        )));
621                    }
622                    *height += 1;
623
624                    //  Update total global stats
625                    // `None` represents stats across all namespaces.
626                    // It is represented as -1 in database
627
628                    *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
629                    *size.entry(None).or_insert(0) += block.size as usize;
630
631                    // Add row for global cumulative stats (namespace = -1)
632
633                    rows.push((
634                        block.height as i64,
635                        -1,
636                        tx_count[&None] as i64,
637                        size[&None] as i64,
638                    ));
639
640                    // Update per-namespace cumulative stats
641                    for (&ns_id, info) in &block.namespaces {
642                        let key = Some(ns_id);
643
644                        *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
645                        *size.entry(key).or_insert(0) += info.size as usize;
646                    }
647
648                    //  Insert cumulative stats for all known namespaces
649                    // Even if a namespace wasn't present in this block,
650                    // we still insert its latest cumulative stats at this height.
651                    for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
652                        let key = Some(*ns_id);
653                        rows.push((
654                            block.height as i64,
655                            (*ns_id).into(),
656                            tx_count[&key] as i64,
657                            size[&key] as i64,
658                        ));
659                    }
660
661                    Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
662                },
663            )
664            .collect::<anyhow::Result<Vec<_>>>()?;
665        let last_aggregate = aggregates.last().cloned();
666
667        let (height, num_transactions, payload_size) =
668            last_aggregate.ok_or_else(|| anyhow!("no row"))?;
669
670        self.upsert(
671            "aggregate",
672            ["height", "namespace", "num_transactions", "payload_size"],
673            ["height", "namespace"],
674            rows,
675        )
676        .await?;
677        Ok(Aggregate {
678            height,
679            num_transactions,
680            payload_size,
681        })
682    }
683}
684
685impl<Mode: TransactionMode> Transaction<Mode> {
686    async fn time_window<Types: NodeType>(
687        &mut self,
688        start: u64,
689        end: u64,
690        limit: usize,
691    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
692        // Find all blocks whose timestamps fall within the window [start, end). Block timestamps
693        // are monotonically increasing, so this query is guaranteed to return a contiguous range of
694        // blocks ordered by increasing height.
695        //
696        // We order by timestamp _then_ height, because the timestamp order allows the query planner
697        // to use the index on timestamp to also efficiently solve the WHERE clause, but this
698        // process may turn up multiple results, due to the 1-second resolution of block timestamps.
699        // The final sort by height guarantees us a unique, deterministic result (the first block
700        // with a given timestamp). This sort may not be able to use an index, but it shouldn't be
701        // too expensive, since there will never be more than a handful of blocks with the same
702        // timestamp.
703        let sql = format!(
704            "SELECT {HEADER_COLUMNS}
705               FROM header AS h
706              WHERE h.timestamp >= $1 AND h.timestamp < $2
707              ORDER BY h.timestamp, h.height
708              LIMIT $3"
709        );
710        let rows = query(&sql)
711            .bind(start as i64)
712            .bind(end as i64)
713            .bind(limit as i64)
714            .fetch(self.as_mut());
715        let window: Vec<_> = rows
716            .map(|row| parse_header::<Types, _>(row?))
717            .try_collect()
718            .await?;
719
720        let next = if window.len() < limit {
721            // If we are not limited, complete the window by finding the block just after.
722            let sql = format!(
723                "SELECT {HEADER_COLUMNS}
724               FROM header AS h
725              WHERE h.timestamp >= $1
726              ORDER BY h.timestamp, h.height
727              LIMIT 1"
728            );
729            query(&sql)
730                .bind(end as i64)
731                .fetch_optional(self.as_mut())
732                .await?
733                .map(parse_header::<Types, _>)
734                .transpose()?
735        } else {
736            // If we have been limited, return a `null` next block indicating an incomplete window.
737            // The client will have to query again with an adjusted starting point to get subsequent
738            // results.
739            tracing::debug!(limit, "cutting off header window request due to limit");
740            None
741        };
742
743        // If the `next` block exists, _or_ if any block in the window exists, we know we have
744        // enough information to definitively say at least where the window starts (we may or may
745        // not have where it ends, depending on how many blocks have thus far been produced).
746        // However, if we have neither a block in the window nor a block after it, we cannot say
747        // whether the next block produced will have a timestamp before or after the window start.
748        // In this case, we don't know what the `prev` field of the response should be, so we return
749        // an error: the caller must try again after more blocks have been produced.
750        if window.is_empty() && next.is_none() {
751            return Err(QueryError::NotFound);
752        }
753
754        // Find the block just before the window.
755        let sql = format!(
756            "SELECT {HEADER_COLUMNS}
757               FROM header AS h
758              WHERE h.timestamp < $1
759              ORDER BY h.timestamp DESC, h.height DESC
760              LIMIT 1"
761        );
762        let prev = query(&sql)
763            .bind(start as i64)
764            .fetch_optional(self.as_mut())
765            .await?
766            .map(parse_header::<Types, _>)
767            .transpose()?;
768
769        Ok(TimeWindowQueryData { window, prev, next })
770    }
771}
772
773/// Get inclusive start and end bounds for a range to pull aggregate statistics.
774///
775/// Returns [`None`] if there are no blocks in the given range, in which case the result should be
776/// the default value of the aggregate statistic.
777async fn aggregate_range_bounds<Types>(
778    tx: &mut Transaction<impl TransactionMode>,
779    range: impl RangeBounds<usize>,
780) -> QueryResult<Option<(usize, usize)>>
781where
782    Types: NodeType,
783    Header<Types>: QueryableHeader<Types>,
784{
785    let from = match range.start_bound() {
786        Bound::Included(from) => *from,
787        Bound::Excluded(from) => *from + 1,
788        Bound::Unbounded => 0,
789    };
790    let to = match range.end_bound() {
791        Bound::Included(to) => *to,
792        Bound::Excluded(0) => return Ok(None),
793        Bound::Excluded(to) => *to - 1,
794        Bound::Unbounded => {
795            let height = AggregatesStorage::<Types>::aggregates_height(tx)
796                .await
797                .map_err(|err| QueryError::Error {
798                    message: format!("{err:#}"),
799                })?;
800            if height == 0 {
801                return Ok(None);
802            }
803            if height < from {
804                return Ok(None);
805            }
806            height - 1
807        },
808    };
809    Ok(Some((from, to)))
810}
811
812#[cfg(test)]
813mod test {
814    use hotshot_example_types::node_types::TEST_VERSIONS;
815    use hotshot_types::vid::advz::advz_scheme;
816    use itertools::Itertools;
817    use jf_advz::VidScheme;
818    use pretty_assertions::assert_eq;
819
820    use super::*;
821    use crate::{
822        availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
823        data_source::{
824            Transaction as _, VersionedDataSource,
825            sql::testing::TmpDb,
826            storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
827        },
828        testing::mocks::MockTypes,
829    };
830
831    async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
832        let storage = TmpDb::init().await;
833        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
834            .await
835            .unwrap();
836
837        // Generate some mock leaves to insert.
838        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
839            LeafQueryData::<MockTypes>::genesis(
840                &Default::default(),
841                &Default::default(),
842                TEST_VERSIONS.test,
843            )
844            .await,
845        ];
846        for i in 1..end {
847            let mut leaf = leaves[i - 1].clone();
848            leaf.leaf.block_header_mut().block_number = i as u64;
849            leaves.push(leaf);
850        }
851
852        // Set up.
853        {
854            let mut tx = db.write().await.unwrap();
855
856            for &(start, end) in present_ranges {
857                for leaf in &leaves[start..end] {
858                    tx.insert_leaf(leaf).await.unwrap();
859                }
860            }
861
862            tx.commit().await.unwrap();
863        }
864
865        let sync_status = db
866            .read()
867            .await
868            .unwrap()
869            .sync_status_ranges("leaf2", "height", start, end)
870            .await
871            .unwrap();
872
873        // Verify missing.
874        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
875        let total = end - start;
876        assert_eq!(sync_status.missing, total - present);
877
878        // Verify ranges.
879        let mut ranges = sync_status.ranges.into_iter();
880        let mut prev = start;
881        for &(start, end) in present_ranges {
882            if start != prev {
883                let range = ranges.next().unwrap();
884                assert_eq!(
885                    range,
886                    SyncStatusRange {
887                        start: prev,
888                        end: start,
889                        status: SyncStatus::Missing,
890                    }
891                );
892            }
893            let range = ranges.next().unwrap();
894            assert_eq!(
895                range,
896                SyncStatusRange {
897                    start,
898                    end,
899                    status: SyncStatus::Present,
900                }
901            );
902            prev = end;
903        }
904
905        if prev != end {
906            let range = ranges.next().unwrap();
907            assert_eq!(
908                range,
909                SyncStatusRange {
910                    start: prev,
911                    end,
912                    status: SyncStatus::Missing,
913                }
914            );
915        }
916
917        assert_eq!(ranges.next(), None);
918    }
919
920    #[tokio::test]
921    #[test_log::test]
922    async fn test_sync_status_ranges_bookends_present() {
923        test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
924    }
925
926    #[tokio::test]
927    #[test_log::test]
928    async fn test_sync_status_ranges_bookends_missing() {
929        test_sync_status_ranges(0, 6, &[(2, 4)]).await;
930    }
931
932    #[tokio::test]
933    #[test_log::test]
934    async fn test_sync_status_ranges_start_offset_bookends_present() {
935        test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
936    }
937
938    #[tokio::test]
939    #[test_log::test]
940    async fn test_sync_status_ranges_start_offset_bookends_missing() {
941        test_sync_status_ranges(1, 8, &[(4, 6)]).await;
942    }
943
944    #[tokio::test]
945    #[test_log::test]
946    async fn test_sync_status_ranges_singleton_ranges() {
947        test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
948    }
949
950    #[tokio::test]
951    #[test_log::test]
952    async fn test_sync_status_ranges_many_ranges_bookends_present() {
953        let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
954        test_sync_status_ranges(0, 201, &ranges).await;
955    }
956
957    #[tokio::test]
958    #[test_log::test]
959    async fn test_sync_status_ranges_many_ranges_bookends_missing() {
960        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
961        test_sync_status_ranges(0, 202, &ranges).await;
962    }
963
964    #[tokio::test]
965    #[test_log::test]
966    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
967        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
968        test_sync_status_ranges(1, 201, &ranges).await;
969    }
970
971    #[tokio::test]
972    #[test_log::test]
973    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
974        let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
975        test_sync_status_ranges(1, 202, &ranges).await;
976    }
977
978    #[tokio::test]
979    #[test_log::test]
980    async fn test_sync_status_duplicate_payload() {
981        let storage = TmpDb::init().await;
982        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
983            .await
984            .unwrap();
985        let mut vid = advz_scheme(2);
986
987        // Create two blocks with the same (empty) payload.
988        let mut leaves = vec![
989            LeafQueryData::<MockTypes>::genesis(
990                &Default::default(),
991                &Default::default(),
992                TEST_VERSIONS.test,
993            )
994            .await,
995        ];
996        let mut blocks = vec![
997            BlockQueryData::<MockTypes>::genesis(
998                &Default::default(),
999                &Default::default(),
1000                TEST_VERSIONS.test.base,
1001            )
1002            .await,
1003        ];
1004        let dispersal = vid.disperse([]).unwrap();
1005
1006        let mut leaf = leaves[0].clone();
1007        leaf.leaf.block_header_mut().block_number += 1;
1008        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1009        leaves.push(leaf);
1010        blocks.push(block);
1011
1012        // Insert the first leaf without payload or VID data.
1013        {
1014            let mut tx = db.write().await.unwrap();
1015            tx.insert_leaf(&leaves[0]).await.unwrap();
1016            tx.commit().await.unwrap();
1017        }
1018
1019        // The block and VID data are missing.
1020        let missing = ResourceSyncStatus {
1021            missing: 1,
1022            ranges: vec![SyncStatusRange {
1023                status: SyncStatus::Missing,
1024                start: 0,
1025                end: 1,
1026            }],
1027        };
1028        assert_eq!(
1029            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1030                .await
1031                .unwrap(),
1032            SyncStatusQueryData {
1033                leaves: ResourceSyncStatus {
1034                    missing: 0,
1035                    ranges: vec![SyncStatusRange {
1036                        status: SyncStatus::Present,
1037                        start: 0,
1038                        end: 1,
1039                    }]
1040                },
1041                blocks: missing.clone(),
1042                vid_common: missing.clone(),
1043                pruned_height: None,
1044            }
1045        );
1046
1047        // Insert the second block with all data.
1048        {
1049            let mut tx = db.write().await.unwrap();
1050            tx.insert_leaf(&leaves[1]).await.unwrap();
1051            tx.insert_block(&blocks[1]).await.unwrap();
1052            tx.insert_vid(
1053                &VidCommonQueryData::<MockTypes>::new(
1054                    leaves[1].header().clone(),
1055                    hotshot_types::data::VidCommon::V0(dispersal.common),
1056                ),
1057                Some(&VidShare::V0(dispersal.shares[0].clone())),
1058            )
1059            .await
1060            .unwrap();
1061            tx.commit().await.unwrap();
1062        }
1063
1064        // The payload data is shared by both leaves, and nothing is missing.
1065        let present = ResourceSyncStatus {
1066            missing: 0,
1067            ranges: vec![SyncStatusRange {
1068                status: SyncStatus::Present,
1069                start: 0,
1070                end: 2,
1071            }],
1072        };
1073        assert_eq!(
1074            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1075                .await
1076                .unwrap(),
1077            SyncStatusQueryData {
1078                leaves: present.clone(),
1079                blocks: present.clone(),
1080                vid_common: present,
1081                pruned_height: None,
1082            }
1083        );
1084    }
1085
1086    #[tokio::test]
1087    #[test_log::test]
1088    async fn test_sync_status_same_payload_different_ns_table() {
1089        let storage = TmpDb::init().await;
1090        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1091            .await
1092            .unwrap();
1093        let mut vid = advz_scheme(2);
1094
1095        // Create two blocks with byte-identical payloads, but different namespace tables (meaning
1096        // the interpretation of the payload is different).
1097        // Create two blocks with the same (empty) payload.
1098        let mut leaves = vec![
1099            LeafQueryData::<MockTypes>::genesis(
1100                &Default::default(),
1101                &Default::default(),
1102                TEST_VERSIONS.test,
1103            )
1104            .await,
1105        ];
1106        let mut blocks = vec![
1107            BlockQueryData::<MockTypes>::genesis(
1108                &Default::default(),
1109                &Default::default(),
1110                TEST_VERSIONS.test.base,
1111            )
1112            .await,
1113        ];
1114        let dispersal = vid.disperse([]).unwrap();
1115
1116        let mut leaf = leaves[0].clone();
1117        leaf.leaf.block_header_mut().block_number += 1;
1118        leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1119        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1120        leaves.push(leaf);
1121        blocks.push(block);
1122
1123        // Insert the first leaf without payload or VID data.
1124        {
1125            let mut tx = db.write().await.unwrap();
1126            tx.insert_leaf(&leaves[0]).await.unwrap();
1127            tx.commit().await.unwrap();
1128        }
1129
1130        // The block and VID data are missing.
1131        let missing = ResourceSyncStatus {
1132            missing: 1,
1133            ranges: vec![SyncStatusRange {
1134                status: SyncStatus::Missing,
1135                start: 0,
1136                end: 1,
1137            }],
1138        };
1139        assert_eq!(
1140            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1141                .await
1142                .unwrap(),
1143            SyncStatusQueryData {
1144                leaves: ResourceSyncStatus {
1145                    missing: 0,
1146                    ranges: vec![SyncStatusRange {
1147                        status: SyncStatus::Present,
1148                        start: 0,
1149                        end: 1,
1150                    }]
1151                },
1152                blocks: missing.clone(),
1153                vid_common: missing.clone(),
1154                pruned_height: None,
1155            }
1156        );
1157
1158        // Insert the second block with all data.
1159        {
1160            let mut tx = db.write().await.unwrap();
1161            tx.insert_leaf(&leaves[1]).await.unwrap();
1162            tx.insert_block(&blocks[1]).await.unwrap();
1163            tx.insert_vid(
1164                &VidCommonQueryData::<MockTypes>::new(
1165                    leaves[1].header().clone(),
1166                    hotshot_types::data::VidCommon::V0(dispersal.common),
1167                ),
1168                Some(&VidShare::V0(dispersal.shares[0].clone())),
1169            )
1170            .await
1171            .unwrap();
1172            tx.commit().await.unwrap();
1173        }
1174
1175        // The payload data cannot be shared because metadata (e.g. num_transactions) differs. VID,
1176        // on the other hand, is independent of the interpretation of the payload and is shared by
1177        // both leaves.
1178        let present = ResourceSyncStatus {
1179            missing: 0,
1180            ranges: vec![SyncStatusRange {
1181                status: SyncStatus::Present,
1182                start: 0,
1183                end: 2,
1184            }],
1185        };
1186        let missing = ResourceSyncStatus {
1187            missing: 1,
1188            ranges: vec![
1189                SyncStatusRange {
1190                    status: SyncStatus::Missing,
1191                    start: 0,
1192                    end: 1,
1193                },
1194                SyncStatusRange {
1195                    status: SyncStatus::Present,
1196                    start: 1,
1197                    end: 2,
1198                },
1199            ],
1200        };
1201        assert_eq!(
1202            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1203                .await
1204                .unwrap(),
1205            SyncStatusQueryData {
1206                leaves: present.clone(),
1207                blocks: missing.clone(),
1208                vid_common: present,
1209                pruned_height: None,
1210            }
1211        );
1212    }
1213}