Skip to main content

hotshot_task_impls/
transactions.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{StreamExt, stream::FuturesUnordered};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{EpochNumber, PackedBundle, VidCommitment, ViewNumber, null_block},
20    epoch_membership::EpochMembershipCoordinator,
21    event::{Event, EventType},
22    message::UpgradeLock,
23    traits::{
24        BlockPayload,
25        block_contents::{BlockHeader, BuilderFee, EncodeBytes},
26        node_implementation::NodeType,
27        signature_key::{BuilderSignatureKey, SignatureKey},
28    },
29    utils::{ViewInner, is_epoch_transition, is_last_block},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::Version;
35use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
36
37use crate::{
38    builder::v0_1::BuilderClient as BuilderClientBase,
39    events::{HotShotEvent, HotShotTaskCompleted},
40    helpers::broadcast_event,
41};
42
43// Parameters for builder querying algorithm
44
45/// Proportion of builders queried in first batch, dividend
46const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
47/// Proportion of builders queried in the first batch, divisor
48const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
49/// Time the first batch of builders has to respond
50const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
51/// Multiplier for extra time to give to the second batch of builders
52const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
53/// Minimum amount of time allotted to both batches, cannot be cut shorter if the first batch
54/// responds extremely fast.
55const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
56/// Delay between re-tries on unsuccessful calls
57const RETRY_DELAY: Duration = Duration::from_millis(100);
58
59/// Builder Provided Responses
60pub struct BuilderResponse<TYPES: NodeType> {
61    /// Fee information
62    pub fee: BuilderFee<TYPES>,
63
64    /// Block payload
65    pub block_payload: TYPES::BlockPayload,
66
67    /// Block metadata
68    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
69}
70
71/// Tracks state of a Transaction task
72pub struct TransactionTaskState<TYPES: NodeType> {
73    /// The state's api
74    pub builder_timeout: Duration,
75
76    /// Output events to application
77    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
78
79    /// View number this view is executing in.
80    pub cur_view: ViewNumber,
81
82    /// Epoch number this node is executing in.
83    pub cur_epoch: Option<EpochNumber>,
84
85    /// Reference to consensus. Leader will require a read lock on this.
86    pub consensus: OuterConsensus<TYPES>,
87
88    /// Membership for the quorum
89    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
90
91    /// Builder 0.1 API clients
92    pub builder_clients: Vec<BuilderClientBase<TYPES>>,
93
94    /// This Nodes Public Key
95    pub public_key: TYPES::SignatureKey,
96
97    /// Our Private Key
98    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
99
100    /// InstanceState
101    pub instance_state: Arc<TYPES::InstanceState>,
102
103    /// This state's ID
104    pub id: u64,
105
106    /// Lock for a decided upgrade
107    pub upgrade_lock: UpgradeLock<TYPES>,
108
109    /// Number of blocks in an epoch, zero means there are no epochs
110    pub epoch_height: u64,
111}
112
113impl<TYPES: NodeType> TransactionTaskState<TYPES> {
114    /// handle view change decide legacy or not
115    pub async fn handle_view_change(
116        &mut self,
117        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
118        block_view: ViewNumber,
119        block_epoch: Option<EpochNumber>,
120        vid: Option<VidCommitment>,
121    ) -> Option<HotShotTaskCompleted> {
122        self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
123            .await
124    }
125
126    /// legacy view change handler
127    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
128    pub async fn handle_view_change_legacy(
129        &mut self,
130        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
131        block_view: ViewNumber,
132        block_epoch: Option<EpochNumber>,
133        vid: Option<VidCommitment>,
134    ) -> Option<HotShotTaskCompleted> {
135        let version = match self.upgrade_lock.version(block_view) {
136            Ok(v) => v,
137            Err(err) => {
138                tracing::error!(
139                    "Upgrade certificate requires unsupported version, refusing to request \
140                     blocks: {err}"
141                );
142                return None;
143            },
144        };
145
146        // Short circuit if we are in epochs and we are likely proposing a transition block
147        // If it's the first view of the upgrade, we don't need to check for transition blocks
148        if version >= EPOCH_VERSION {
149            let Some(epoch) = block_epoch else {
150                tracing::error!("Epoch is required for epoch-based view change");
151                return None;
152            };
153            let high_qc = self.consensus.read().await.high_qc().clone();
154            let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
155                bn
156            } else {
157                // If it's the first view after the upgrade the high QC won't have a block number
158                // So just use the highest_block number we've stored
159                if block_view
160                    > self
161                        .upgrade_lock
162                        .upgrade_view()
163                        .unwrap_or(ViewNumber::new(0))
164                        + 1
165                {
166                    tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167                    self.send_empty_block(event_stream, block_view, block_epoch, version)
168                        .await;
169                    return None;
170                }
171                // 0 here so we use the highest block number in the calculation below
172                0
173            };
174            high_qc_block_number = std::cmp::max(
175                high_qc_block_number,
176                self.consensus.read().await.highest_block,
177            );
178            if self
179                .consensus
180                .read()
181                .await
182                .transition_qc()
183                .is_some_and(|qc| {
184                    let Some(e) = qc.0.data.epoch else {
185                        return false;
186                    };
187                    e == epoch
188                })
189                || is_epoch_transition(high_qc_block_number, self.epoch_height)
190            {
191                // We are proposing a transition block it should be empty
192                if !is_last_block(high_qc_block_number, self.epoch_height) {
193                    tracing::info!(
194                        "Sending empty block event. View number: {block_view}. Parent Block \
195                         number: {high_qc_block_number}"
196                    );
197                    self.send_empty_block(event_stream, block_view, block_epoch, version)
198                        .await;
199                    return None;
200                }
201            }
202        }
203
204        // Request a block from the builder unless we are between versions.
205        let block = {
206            if self
207                .upgrade_lock
208                .decided_upgrade_cert()
209                .as_ref()
210                .is_some_and(|cert| cert.upgrading_in(block_view))
211            {
212                None
213            } else {
214                self.wait_for_block(block_view, vid).await
215            }
216        };
217
218        if let Some(BuilderResponse {
219            block_payload,
220            metadata,
221            fee,
222        }) = block
223        {
224            broadcast_event(
225                Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
226                    block_payload.encode(),
227                    metadata,
228                    block_view,
229                    block_epoch,
230                    vec1::vec1![fee],
231                ))),
232                event_stream,
233            )
234            .await;
235        } else {
236            self.send_empty_block(event_stream, block_view, block_epoch, version)
237                .await;
238        };
239
240        return None;
241    }
242
243    /// Send the event to the event stream that we are proposing an empty block
244    async fn send_empty_block(
245        &self,
246        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
247        block_view: ViewNumber,
248        block_epoch: Option<EpochNumber>,
249        version: Version,
250    ) {
251        // If we couldn't get a block, send an empty block
252        tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
253
254        // Increment the metric for number of empty blocks proposed
255        self.consensus
256            .write()
257            .await
258            .metrics
259            .number_of_empty_blocks_proposed
260            .add(1);
261
262        let num_storage_nodes = match self
263            .membership_coordinator
264            .stake_table_for_epoch(block_epoch)
265        {
266            Ok(epoch_stake_table) => epoch_stake_table.total_nodes(),
267            Err(e) => {
268                tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
269                return;
270            },
271        };
272
273        let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
274            tracing::error!("Failed to get null fee");
275            return;
276        };
277
278        // Create an empty block payload and metadata
279        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
280
281        // Broadcast the empty block
282        broadcast_event(
283            Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
284                vec![].into(),
285                metadata,
286                block_view,
287                block_epoch,
288                vec1::vec1![null_fee],
289            ))),
290            event_stream,
291        )
292        .await;
293    }
294
295    /// Produce a null block
296    pub async fn null_block(
297        &self,
298        block_view: ViewNumber,
299        block_epoch: Option<EpochNumber>,
300        version: Version,
301        num_storage_nodes: usize,
302    ) -> Option<PackedBundle<TYPES>> {
303        let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
304            tracing::error!("Failed to calculate null block fee.");
305            return None;
306        };
307
308        // Create an empty block payload and metadata
309        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
310
311        Some(PackedBundle::new(
312            vec![].into(),
313            metadata,
314            block_view,
315            block_epoch,
316            vec1::vec1![null_fee],
317        ))
318    }
319
320    /// main task event handler
321    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
322    pub async fn handle(
323        &mut self,
324        event: Arc<HotShotEvent<TYPES>>,
325        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
326    ) -> Result<()> {
327        match event.as_ref() {
328            HotShotEvent::TransactionsRecv(transactions) => {
329                broadcast_event(
330                    Event {
331                        view_number: self.cur_view,
332                        event: EventType::Transactions {
333                            transactions: transactions.clone(),
334                        },
335                    },
336                    &self.output_event_stream,
337                )
338                .await;
339            },
340            HotShotEvent::ViewChange(view, epoch) => {
341                let view = ViewNumber::new(std::cmp::max(1, **view));
342                ensure!(
343                    *view > *self.cur_view && *epoch >= self.cur_epoch,
344                    debug!(
345                        "Received a view change to an older view and epoch: tried to change view \
346                         to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
347                        self.cur_view, self.cur_epoch
348                    )
349                );
350                self.cur_view = view;
351                self.cur_epoch = *epoch;
352
353                let leader = self
354                    .membership_coordinator
355                    .membership_for_epoch(*epoch)?
356                    .leader(view)?;
357                if leader == self.public_key {
358                    self.handle_view_change(&event_stream, view, *epoch, None)
359                        .await;
360                    return Ok(());
361                }
362            },
363            HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
364                let view_number = proposal.data.view_number();
365                let next_view = view_number + 1;
366
367                let version = match self.upgrade_lock.version(next_view) {
368                    Ok(v) => v,
369                    Err(e) => {
370                        tracing::error!("Failed to calculate version: {e:?}");
371                        return Ok(());
372                    },
373                };
374
375                if version < DRB_AND_HEADER_UPGRADE_VERSION {
376                    return Ok(());
377                }
378
379                let vid = proposal.data.block_header().payload_commitment();
380                let block_height = proposal.data.block_header().block_number();
381                if is_epoch_transition(block_height, self.epoch_height) {
382                    return Ok(());
383                }
384                if next_view <= self.cur_view {
385                    return Ok(());
386                }
387                // move to next view for this task only
388                self.cur_view = next_view;
389
390                let leader = self
391                    .membership_coordinator
392                    .membership_for_epoch(self.cur_epoch)?
393                    .leader(next_view)?;
394                if leader == self.public_key {
395                    self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
396                        .await;
397                    return Ok(());
398                }
399            },
400            _ => {},
401        }
402        Ok(())
403    }
404
405    /// Get VID commitment for the last successful view before `block_view`.
406    /// Returns None if we don't have said commitment recorded.
407    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
408    async fn last_vid_commitment_retry(
409        &self,
410        block_view: ViewNumber,
411        task_start_time: Instant,
412    ) -> Result<(ViewNumber, VidCommitment)> {
413        loop {
414            match self.last_vid_commitment(block_view).await {
415                Ok((view, comm)) => break Ok((view, comm)),
416                Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
417                _ => {
418                    // We still have time, will re-try in a bit
419                    sleep(RETRY_DELAY).await;
420                    continue;
421                },
422            }
423        }
424    }
425
426    /// Get VID commitment for the last successful view before `block_view`.
427    /// Returns None if we don't have said commitment recorded.
428    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
429    async fn last_vid_commitment(
430        &self,
431        block_view: ViewNumber,
432    ) -> Result<(ViewNumber, VidCommitment)> {
433        let consensus_reader = self.consensus.read().await;
434        let mut target_view = ViewNumber::new(block_view.saturating_sub(1));
435
436        loop {
437            let view_data = consensus_reader
438                .validated_state_map()
439                .get(&target_view)
440                .context(info!(
441                    "Missing record for view {target_view} in validated state",
442                ))?;
443
444            match &view_data.view_inner {
445                ViewInner::Da {
446                    payload_commitment, ..
447                } => return Ok((target_view, *payload_commitment)),
448                ViewInner::Leaf {
449                    leaf: leaf_commitment,
450                    ..
451                } => {
452                    let leaf = consensus_reader
453                        .saved_leaves()
454                        .get(leaf_commitment)
455                        .context(info!(
456                            "Missing leaf with commitment {leaf_commitment} for view \
457                             {target_view} in saved_leaves",
458                        ))?;
459                    return Ok((target_view, leaf.payload_commitment()));
460                },
461                ViewInner::Failed => {
462                    // For failed views, backtrack
463                    target_view = ViewNumber::new(target_view.checked_sub(1).context(warn!(
464                        "Reached genesis. Something is wrong -- have we not decided any blocks \
465                         since genesis?"
466                    ))?);
467                    continue;
468                },
469            }
470        }
471    }
472
473    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
474    async fn wait_for_block(
475        &self,
476        block_view: ViewNumber,
477        vid: Option<VidCommitment>,
478    ) -> Option<BuilderResponse<TYPES>> {
479        let task_start_time = Instant::now();
480
481        // Find commitment to the block we want to build upon
482        let (parent_view, parent_comm) = if let Some(vid) = vid {
483            (block_view - 1, vid)
484        } else {
485            match self
486                .last_vid_commitment_retry(block_view, task_start_time)
487                .await
488            {
489                Ok((v, c)) => (v, c),
490                Err(e) => {
491                    tracing::warn!("Failed to find last vid commitment in time: {e}");
492                    return None;
493                },
494            }
495        };
496
497        let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
498            &self.private_key,
499            parent_comm.as_ref(),
500        ) {
501            Ok(sig) => sig,
502            Err(err) => {
503                tracing::error!(%err, "Failed to sign block hash");
504                return None;
505            },
506        };
507
508        while task_start_time.elapsed() < self.builder_timeout {
509            match timeout(
510                self.builder_timeout
511                    .saturating_sub(task_start_time.elapsed()),
512                self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
513            )
514            .await
515            {
516                // We got a block
517                Ok(Ok(block)) => {
518                    return Some(block);
519                },
520
521                // We failed to get a block
522                Ok(Err(err)) => {
523                    tracing::info!("Couldn't get a block: {err:#}");
524                    // pause a bit
525                    sleep(RETRY_DELAY).await;
526                    continue;
527                },
528
529                // We timed out while getting available blocks
530                Err(err) => {
531                    tracing::info!(%err, "Timeout while getting available blocks");
532                    return None;
533                },
534            }
535        }
536
537        tracing::warn!("could not get a block from the builder in time");
538        None
539    }
540
541    /// Query the builders for available blocks. Queries only fraction of the builders
542    /// based on the response time.
543    async fn get_available_blocks(
544        &self,
545        parent_comm: VidCommitment,
546        view_number: ViewNumber,
547        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
548    ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
549        let tasks = self
550            .builder_clients
551            .iter()
552            .enumerate()
553            .map(|(builder_idx, client)| async move {
554                client
555                    .available_blocks(
556                        parent_comm,
557                        view_number.u64(),
558                        self.public_key.clone(),
559                        parent_comm_sig,
560                    )
561                    .await
562                    .map(move |blocks| {
563                        blocks
564                            .into_iter()
565                            .map(move |block_info| (block_info, builder_idx))
566                    })
567            })
568            .collect::<FuturesUnordered<_>>();
569        let mut results = Vec::with_capacity(self.builder_clients.len());
570        let query_start = Instant::now();
571        let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
572            .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
573        let mut tasks = tasks.take(threshold);
574        while let Some(result) = tasks.next().await {
575            results.push(result);
576            if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
577                break;
578            }
579        }
580        let timeout = sleep(std::cmp::max(
581            query_start
582                .elapsed()
583                .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
584            BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
585        ));
586        futures::pin_mut!(timeout);
587        let mut tasks = tasks.into_inner().take_until(timeout);
588        while let Some(result) = tasks.next().await {
589            results.push(result);
590        }
591        results
592            .into_iter()
593            .filter_map(|result| result.ok())
594            .flatten()
595            .collect::<Vec<_>>()
596    }
597
598    /// Get a block from builder.
599    /// Queries the sufficiently fast builders for available blocks and chooses the one with the
600    /// best fee/byte ratio, re-trying with the next best one in case of failure.
601    ///
602    /// # Errors
603    /// If none of the builder reports any available blocks or claiming block fails for all of the
604    /// builders.
605    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
606    async fn block_from_builder(
607        &self,
608        parent_comm: VidCommitment,
609        view_number: ViewNumber,
610        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
611    ) -> Result<BuilderResponse<TYPES>> {
612        let mut available_blocks = self
613            .get_available_blocks(parent_comm, view_number, parent_comm_sig)
614            .await;
615
616        available_blocks.sort_by(|(l, _), (r, _)| {
617            // We want the block with the highest fee per byte of data we're going to have to
618            // process, thus our comparison function is:
619            //      (l.offered_fee / l.block_size) < (r.offered_fee / r.block_size)
620            // To avoid floating point math (which doesn't even have an `Ord` impl) we multiply
621            // through by the denominators to get
622            //      l.offered_fee * r.block_size < r.offered_fee * l.block_size
623            // We cast up to u128 to avoid overflow.
624            (u128::from(l.offered_fee) * u128::from(r.block_size))
625                .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
626        });
627
628        if available_blocks.is_empty() {
629            tracing::info!("No available blocks");
630            bail!("No available blocks");
631        }
632
633        for (block_info, builder_idx) in available_blocks {
634            // Verify signature over chosen block.
635            if !block_info.sender.validate_block_info_signature(
636                &block_info.signature,
637                block_info.block_size,
638                block_info.offered_fee,
639                &block_info.block_hash,
640            ) {
641                tracing::warn!("Failed to verify available block info response message signature");
642                continue;
643            }
644
645            let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
646                &self.private_key,
647                block_info.block_hash.as_ref(),
648            ) {
649                Ok(request_signature) => request_signature,
650                Err(err) => {
651                    tracing::error!(%err, "Failed to sign block hash");
652                    continue;
653                },
654            };
655
656            let response = {
657                let client = &self.builder_clients[builder_idx];
658
659                let (block, either_header_input) = futures::join! {
660                    client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
661                    client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
662                };
663
664                let block_data = match block {
665                    Ok(block_data) => block_data,
666                    Err(err) => {
667                        tracing::warn!(%err, "Error claiming block data");
668                        continue;
669                    },
670                };
671
672                let Ok(either_header_input) = either_header_input
673                    .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
674                else {
675                    continue;
676                };
677
678                let Some(header_input) = either_header_input
679                    .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
680                else {
681                    tracing::warn!(
682                        "Failed to verify available new or legacy block header input data \
683                         response message signature"
684                    );
685                    continue;
686                };
687
688                // verify the signature over the message
689                if !block_data.validate_signature() {
690                    tracing::warn!(
691                        "Failed to verify available block data response message signature"
692                    );
693                    continue;
694                }
695
696                let fee = BuilderFee {
697                    fee_amount: block_info.offered_fee,
698                    fee_account: header_input.sender,
699                    fee_signature: header_input.fee_signature,
700                };
701
702                BuilderResponse {
703                    fee,
704                    block_payload: block_data.block_payload,
705                    metadata: block_data.metadata,
706                }
707            };
708
709            return Ok(response);
710        }
711
712        bail!("Couldn't claim a block from any of the builders");
713    }
714}
715
716#[async_trait]
717/// task state implementation for Transactions Task
718impl<TYPES: NodeType> TaskState for TransactionTaskState<TYPES> {
719    type Event = HotShotEvent<TYPES>;
720
721    async fn handle_event(
722        &mut self,
723        event: Arc<Self::Event>,
724        sender: &Sender<Arc<Self::Event>>,
725        _receiver: &Receiver<Arc<Self::Event>>,
726    ) -> Result<()> {
727        if self.upgrade_lock.new_protocol_active(self.cur_view) {
728            return Ok(());
729        }
730        self.handle(event, sender.clone()).await
731    }
732
733    fn cancel_subtasks(&mut self) {}
734}