Skip to main content

hotshot_query_service/data_source/
update.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! A generic algorithm for updating a HotShot Query Service data source with new data.
14use std::iter::once;
15
16use anyhow::{Context, ensure};
17use async_trait::async_trait;
18use committable::Committable;
19use futures::future::Future;
20use hotshot::types::EventType;
21use hotshot_types::{
22    data::{
23        Leaf2, VidCommitment, VidCommon, VidDisperseShare, VidShare, ViewNumber,
24        ns_table::parse_ns_table,
25    },
26    event::LeafInfo,
27    new_protocol::CoordinatorEvent,
28    traits::{
29        block_contents::{BlockHeader, BlockPayload, EncodeBytes, GENESIS_VID_NUM_STORAGE_NODES},
30        node_implementation::NodeType,
31    },
32    vid::{
33        advz::advz_scheme,
34        avidm::{AvidMScheme, init_avidm_param},
35        avidm_gf2::{AvidmGf2Scheme, init_avidm_gf2_param},
36    },
37    vote::HasViewNumber,
38};
39use jf_advz::VidScheme;
40
41use crate::{
42    Header, Payload,
43    availability::{
44        BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload,
45        UpdateAvailabilityData, VidCommonQueryData,
46    },
47    types::HeightIndexed,
48};
49
50/// An extension trait for types which implement the update trait for each API module.
51///
52/// If a type implements [UpdateAvailabilityData] and
53/// [UpdateStatusData](crate::status::UpdateStatusData), then it can be fully kept up to date
54/// through two interfaces:
55/// * [populate_metrics](crate::status::UpdateStatusData::populate_metrics), to get a handle for
56///   populating the status metrics, which should be used when initializing a
57///   [SystemContextHandle](hotshot::types::SystemContextHandle)
58/// * [update](Self::update), provided by this extension trait, to update the query state when a new
59///   HotShot event is emitted
60#[async_trait]
61pub trait UpdateDataSource<Types: NodeType>: UpdateAvailabilityData<Types> {
62    /// Update query state based on consensus event.
63    ///
64    /// The caller is responsible for authenticating `event`. This function does not perform any
65    /// authentication, and if given an invalid `event` (one which does not follow from the latest
66    /// known state of the ledger) it may panic or silently accept the invalid `event`. This allows
67    /// the best possible performance in the case where the query service and the HotShot instance
68    /// are running in the same process (and thus the event stream, directly from HotShot) is
69    /// trusted.
70    ///
71    /// If you want to update the data source with an untrusted event, for example one received from
72    /// a peer over the network, you must authenticate it first.
73    ///
74    ///
75    /// For each decided leaf the query service stores a `BlockInfo` containing the leaf paired
76    /// with a QC that certifies it (`LeafQueryData`), the block payload and VID data when
77    /// available, and only on the **newest** leaf in the batch, it stores the proof that finalizes it:
78    /// a `qc_chain` for legacy protocol set via `BlockInfo::with_qc_chain`, or a `cert2` for new protocol set
79    /// via `BlockInfo::with_cert2`. The two protocols differ only in where those pieces come
80    /// from:
81    ///
82    /// In both events leaves arrive in **newest → oldest** order (`leaves[0]` is the leaf being
83    /// finalized; each subsequent leaf is its ancestor reached via `justify_qc`). For the new
84    /// protocol, `vid_shares` is parallel to `leaves` (same ordering, one share per leaf). The
85    /// handler iterates in reverse so heights are appended ascending.
86    ///
87    /// - **Legacy (`CoordinatorEvent::LegacyEvent` → `EventType::Decide`).** The newest leaf is
88    ///   certified by `committing_qc`; each older leaf is certified by the *next-newer* leaf's
89    ///   `justify_qc`. The newest leaf's `qc_chain` is set to `[committing_qc, deciding_qc]` —
90    ///   the two consecutive QCs that decide it under the legacy 3-chain rule.
91    ///
92    /// - **New protocol (`CoordinatorEvent::NewDecide`).** The newest leaf is certified by
93    ///   `cert1`; older leaves are again certified by the next leaf's `justify_qc`. When a
94    ///   `cert2` is present, it is attached to the newest leaf. Under the new protocol a single
95    ///   `cert2` finalizes that leaf directly, replacing the legacy QC chain.
96    ///
97    /// # Returns
98    ///
99    /// If all provided data is successfully inserted into the database, returns `Ok(())`. If any
100    /// error occurred, the error is logged, and the return value is the height of the first leaf
101    /// which failed to be inserted.
102    async fn update(&self, event: &CoordinatorEvent<Types>) -> Result<(), u64>;
103}
104
105#[async_trait]
106impl<Types: NodeType, T> UpdateDataSource<Types> for T
107where
108    T: UpdateAvailabilityData<Types> + Send + Sync,
109    Header<Types>: QueryableHeader<Types>,
110    Payload<Types>: QueryablePayload<Types>,
111{
112    async fn update(&self, event: &CoordinatorEvent<Types>) -> Result<(), u64> {
113        match event {
114            CoordinatorEvent::LegacyEvent(event) => {
115                let EventType::Decide {
116                    leaf_chain,
117                    committing_qc,
118                    deciding_qc,
119                    ..
120                } = &event.event
121                else {
122                    return Ok(());
123                };
124
125                // `qc` justifies the first (most recent) leaf...
126                let qcs = once(committing_qc.qc().clone())
127                    // ...and each leaf in the chain justifies the subsequent leaf (its parent)
128                    // through `leaf.justify_qc`.
129                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc()))
130                    // Put the QCs in chronological order.
131                    .rev()
132                    // The oldest QC is the `justify_qc` of the oldest leaf, which does not justify
133                    // any leaf in the new chain, so we don't need it.
134                    .skip(1);
135                for (
136                    qc2,
137                    LeafInfo {
138                        leaf: leaf2,
139                        vid_share,
140                        state_cert: _,
141                        ..
142                    },
143                ) in qcs.zip(leaf_chain.iter().rev())
144                {
145                    let height = leaf2.block_header().block_number();
146
147                    let leaf_data = match LeafQueryData::new(leaf2.clone(), qc2.clone()) {
148                        Ok(leaf) => leaf,
149                        Err(err) => {
150                            tracing::error!(
151                                height,
152                                ?leaf2,
153                                ?committing_qc,
154                                "inconsistent leaf; cannot append leaf information: {err:#}"
155                            );
156                            return Err(leaf2.block_header().block_number());
157                        },
158                    };
159                    let block_data = leaf2
160                        .block_payload()
161                        .map(|payload| BlockQueryData::new(leaf2.block_header().clone(), payload));
162                    if block_data.is_none() {
163                        tracing::warn!(height, "block payload missing at decide");
164                    }
165
166                    let (vid_common, vid_share) = match vid_share {
167                        Some(VidDisperseShare::V0(share)) => (
168                            Some(VidCommonQueryData::new(
169                                leaf2.block_header().clone(),
170                                VidCommon::V0(share.common.clone()),
171                            )),
172                            Some(VidShare::V0(share.share.clone())),
173                        ),
174                        Some(VidDisperseShare::V1(share)) => (
175                            Some(VidCommonQueryData::new(
176                                leaf2.block_header().clone(),
177                                VidCommon::V1(share.common.clone()),
178                            )),
179                            Some(VidShare::V1(share.share.clone())),
180                        ),
181                        Some(VidDisperseShare::V2(share)) => (
182                            Some(VidCommonQueryData::new(
183                                leaf2.block_header().clone(),
184                                VidCommon::V2(share.common.clone()),
185                            )),
186                            Some(VidShare::V2(share.share.clone())),
187                        ),
188                        None => {
189                            if leaf2.view_number() == ViewNumber::genesis() {
190                                // HotShot does not run VID in consensus for the genesis block. In
191                                // this case, the block payload is guaranteed to always be empty, so
192                                // VID isn't really necessary. But for consistency, we will still
193                                // store the VID dispersal data, computing it ourselves based on the
194                                // well-known genesis VID commitment.
195                                match genesis_vid(leaf2) {
196                                    Ok((common, share)) => (Some(common), Some(share)),
197                                    Err(err) => {
198                                        tracing::warn!("failed to compute genesis VID: {err:#}");
199                                        (None, None)
200                                    },
201                                }
202                            } else {
203                                (None, None)
204                            }
205                        },
206                    };
207
208                    if vid_common.is_none() {
209                        tracing::info!(height, "VID not available at decide");
210                    }
211
212                    let mut info = BlockInfo::new(leaf_data, block_data, vid_common, vid_share);
213                    if let Some(deciding_qc) = deciding_qc
214                        && committing_qc.view_number() == info.leaf.leaf().view_number()
215                    {
216                        let qc_chain =
217                            [committing_qc.as_ref().clone(), deciding_qc.as_ref().clone()];
218                        info = info.with_qc_chain(qc_chain);
219                    }
220                    if let Err(err) = self.append(info).await {
221                        tracing::error!(height, "failed to append leaf information: {err:#}");
222                        return Err(leaf2.block_header().block_number());
223                    }
224                }
225            },
226            CoordinatorEvent::NewDecide {
227                leaf_infos,
228                cert1,
229                cert2,
230            } => {
231                let Some(first) = leaf_infos.first() else {
232                    tracing::error!("new decide event contained no leaves");
233                    return Ok(());
234                };
235                let first_leaf = &first.leaf;
236
237                if let Some(cert2) = cert2
238                    && cert2.data.leaf_commit != Committable::commit(first_leaf)
239                {
240                    tracing::error!(
241                        height = first_leaf.height(),
242                        cert2_leaf = %cert2.data.leaf_commit,
243                        newest_leaf = %Committable::commit(first_leaf),
244                        "new decide event cert2 does not certify the newest leaf"
245                    );
246                    return Err(first_leaf.height());
247                }
248
249                // `cert1` certifies the newest leaf; each newer leaf's justify_qc
250                // certifies the next older leaf.
251                let certifying_qcs = once(cert1.clone())
252                    .chain(leaf_infos.iter().map(|info| info.leaf.justify_qc()))
253                    .take(leaf_infos.len())
254                    .collect::<Vec<_>>();
255
256                for (index, (info, qc)) in leaf_infos.iter().zip(certifying_qcs).enumerate().rev() {
257                    let leaf = &info.leaf;
258                    let height = leaf.block_header().block_number();
259
260                    let leaf_data = match LeafQueryData::new(leaf.clone(), qc) {
261                        Ok(leaf) => leaf,
262                        Err(err) => {
263                            tracing::error!(
264                                height,
265                                ?leaf,
266                                "inconsistent leaf; cannot append leaf information: {err:#}"
267                            );
268                            return Err(height);
269                        },
270                    };
271
272                    let block_data = leaf
273                        .block_payload()
274                        .map(|payload| BlockQueryData::new(leaf.block_header().clone(), payload));
275                    if block_data.is_none() {
276                        tracing::warn!(height, "block payload missing at decide");
277                    }
278
279                    // Extract VID common data from the new protocol's VidDisperseShare2.
280                    let (vid_common, vid_share) = match &info.vid_share {
281                        Some(VidDisperseShare::V2(share)) => (
282                            Some(VidCommonQueryData::new(
283                                leaf.block_header().clone(),
284                                VidCommon::V2(share.common.clone()),
285                            )),
286                            Some(VidShare::V2(share.share.clone())),
287                        ),
288                        Some(_) => (None, None),
289                        None => {
290                            if leaf.view_number() == ViewNumber::genesis() {
291                                // HotShot does not run VID in consensus for the genesis block. In
292                                // this case, the block payload is guaranteed to always be empty, so
293                                // VID isn't really necessary. But for consistency, we will still
294                                // store the VID dispersal data, computing it ourselves based on the
295                                // well-known genesis VID commitment.
296                                match genesis_vid(leaf) {
297                                    Ok((common, share)) => (Some(common), Some(share)),
298                                    Err(err) => {
299                                        tracing::warn!("failed to compute genesis VID: {err:#}");
300                                        (None, None)
301                                    },
302                                }
303                            } else {
304                                (None, None)
305                            }
306                        },
307                    };
308
309                    if vid_common.is_none() {
310                        tracing::info!(height, "VID not available at decide");
311                    }
312
313                    let mut info = BlockInfo::new(leaf_data, block_data, vid_common, vid_share);
314
315                    // Attach `cert2` only to the newest leaf in the batch (`leaves[0]`, which is
316                    // `index == 0` since we iterate in reverse). Under the new protocol a single
317                    // `cert2` finalizes that leaf directly
318                    // older leaves in the batch are finalized using indirect rule
319                    if index == 0
320                        && let Some(cert2) = &cert2
321                    {
322                        info = info.with_cert2(cert2.clone());
323                    }
324
325                    if let Err(err) = self.append(info).await {
326                        tracing::error!(height, "failed to append leaf information: {err:#}");
327                        return Err(height);
328                    }
329                }
330            },
331            CoordinatorEvent::BlockPayloadReconstructed {
332                header, payload, ..
333            } => {
334                let block = BlockQueryData::new(header.clone(), payload.clone());
335                let height = block.height();
336                if let Err(err) = self.append_payload(block).await {
337                    tracing::error!(height, "failed to store reconstructed payload: {err:#}");
338                    return Err(height);
339                }
340            },
341            _ => {},
342        }
343        Ok(())
344    }
345}
346
347fn genesis_vid<Types: NodeType>(
348    leaf: &Leaf2<Types>,
349) -> anyhow::Result<(VidCommonQueryData<Types>, VidShare)> {
350    let payload = Payload::<Types>::empty().0;
351    let bytes = payload.encode();
352
353    match leaf.block_header().payload_commitment() {
354        VidCommitment::V0(commit) => {
355            let mut disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
356                .disperse(bytes)
357                .context("unable to compute VID dispersal for genesis block")?;
358
359            ensure!(
360                disperse.commit == commit,
361                "computed VID commit {} for genesis block does not match header commit {}",
362                disperse.commit,
363                commit
364            );
365            Ok((
366                VidCommonQueryData::new(
367                    leaf.block_header().clone(),
368                    VidCommon::V0(disperse.common),
369                ),
370                VidShare::V0(disperse.shares.remove(0)),
371            ))
372        },
373        VidCommitment::V1(commit) => {
374            let avidm_param = init_avidm_param(GENESIS_VID_NUM_STORAGE_NODES)?;
375            let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
376            let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
377
378            let (calculated_commit, mut shares) =
379                AvidMScheme::ns_disperse(&avidm_param, &weights, &bytes, ns_table).unwrap();
380
381            ensure!(
382                calculated_commit == commit,
383                "computed VID commit {} for genesis block does not match header commit {}",
384                calculated_commit,
385                commit
386            );
387
388            Ok((
389                VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V1(avidm_param)),
390                VidShare::V1(shares.remove(0)),
391            ))
392        },
393        VidCommitment::V2(commit) => {
394            let avidm_gf2_param = init_avidm_gf2_param(GENESIS_VID_NUM_STORAGE_NODES)?;
395            let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
396            let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
397
398            let (calculated_commit, common, mut shares) =
399                AvidmGf2Scheme::ns_disperse(&avidm_gf2_param, &weights, &bytes, ns_table).unwrap();
400
401            ensure!(
402                calculated_commit == commit,
403                "computed VID commit {} for genesis block does not match header commit {}",
404                calculated_commit,
405                commit
406            );
407
408            Ok((
409                VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V2(common)),
410                VidShare::V2(shares.remove(0)),
411            ))
412        },
413    }
414}
415
416/// A data source with an atomic transaction-based synchronization interface.
417///
418/// Changes are made to a versioned data source through a [`Transaction`]. Any changes made in a
419/// [`Transaction`] are initially visible only when queried through that same [`Transaction`]. They
420/// are not immediately written back to storage, which means that a new data source object opened
421/// against the same persistent storage will not reflect the changes. In particular, this means that
422/// if the process restarts and reopens its storage, uncommitted changes will be lost.
423///
424/// Only when a [`Transaction`] is committed are changes written back to storage, synchronized with
425/// any concurrent changes, and made visible to other connections to the same data source.
426pub trait VersionedDataSource: Send + Sync {
427    /// A transaction which can read and modify the data source.
428    type Transaction<'a>: Transaction
429    where
430        Self: 'a;
431
432    type ReadOnly<'a>: Transaction
433    where
434        Self: 'a;
435
436    /// Start an atomic transaction on the data source.
437    fn write(&self) -> impl Future<Output = anyhow::Result<Self::Transaction<'_>>> + Send;
438
439    /// Start a read-only transaction on the data source.
440    ///
441    /// A read-only transaction allows the owner to string together multiple queries of the data
442    /// source, which otherwise would not be atomic with respect to concurrent writes, in an atomic
443    /// fashion. Upon returning, [`read`](Self::read) locks in a fully consistent snapshot of the
444    /// data source, and any read operations performed upon the transaction thereafter read from the
445    /// same consistent snapshot. Concurrent modifications to the data source may occur (for
446    /// example, from concurrent [`write`](Self::write) transactions being committed), but their
447    /// results will not be reflected in a successful read-only transaction which was opened before
448    /// the write was committed.
449    ///
450    /// Read-only transactions do not need to be committed, and reverting has no effect.
451    fn read(&self) -> impl Future<Output = anyhow::Result<Self::ReadOnly<'_>>> + Send;
452}
453
454/// A unit of atomicity for updating a shared data source.
455///
456/// The methods provided by this trait can be used to write such pending changes back to persistent
457/// storage ([commit](Self::commit)) so that they become visible to other clients of the same
458/// underlying storage, and are saved if the process restarts. It also allows pending changes to be
459/// rolled back ([revert](Self::revert)) so that they are never written back to storage and are no
460/// longer reflected even through the data source object which was used to make the changes.
461pub trait Transaction: Send + Sync + Sized {
462    fn commit(self) -> impl Future<Output = anyhow::Result<()>> + Send;
463    fn revert(self) -> impl Future + Send;
464}