-
Notifications
You must be signed in to change notification settings - Fork 108
feat: block producer redesign #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
ad707df
555117a
15d2430
256c339
3eae7f1
e9eee63
838da64
9d28bbf
a8601f2
27c5c5a
b80d017
069b5e1
ecdaf8f
3553007
b19957a
7063823
924e7b1
c80a19d
b5eb7be
027dd30
f029b2f
76c9912
474704e
5476f22
e65cf53
2cdabbc
2479db3
acbfa64
b6590c5
5008fdd
b0645d3
7957553
349685c
45627f3
4ba9f24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,18 @@ | ||
| use std::{cmp::min, collections::BTreeSet, sync::Arc, time::Duration}; | ||
|
|
||
| use async_trait::async_trait; | ||
| use miden_objects::{notes::NoteId, transaction::OutputNote}; | ||
| use tokio::time; | ||
| use std::{cmp::min, collections::BTreeSet, num::NonZeroUsize, sync::Arc, time::Duration}; | ||
|
|
||
| use miden_objects::{ | ||
| notes::NoteId, | ||
| transaction::{OutputNote, TransactionId}, | ||
| }; | ||
| use tokio::{sync::Mutex, time}; | ||
| use tonic::async_trait; | ||
| use tracing::{debug, info, instrument, Span}; | ||
|
|
||
| use crate::{block_builder::BlockBuilder, ProvenTransaction, SharedRwVec, COMPONENT}; | ||
| use crate::{ | ||
| block_builder::BlockBuilder, | ||
| mempool::{BatchJobId, Mempool}, | ||
| ProvenTransaction, SharedRwVec, COMPONENT, | ||
| }; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests; | ||
|
|
@@ -206,3 +213,84 @@ where | |
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| pub struct BatchProducer { | ||
| pub batch_interval: Duration, | ||
| pub workers: NonZeroUsize, | ||
| pub mempool: Arc<Mutex<Mempool>>, | ||
| pub tx_per_batch: usize, | ||
| } | ||
|
|
||
| type BatchResult = Result<BatchJobId, (BatchJobId, BuildBatchError)>; | ||
|
|
||
| /// Wrapper around tokio's JoinSet that remains pending if the set is empty, | ||
| /// instead of returning None. | ||
| struct WorkerPool(tokio::task::JoinSet<BatchResult>); | ||
|
|
||
| impl WorkerPool { | ||
| async fn join_next(&mut self) -> Result<BatchResult, tokio::task::JoinError> { | ||
| if self.0.is_empty() { | ||
| std::future::pending().await | ||
| } else { | ||
| // Cannot be None as its not empty. | ||
| self.0.join_next().await.unwrap() | ||
| } | ||
| } | ||
|
|
||
| fn len(&self) -> usize { | ||
| self.0.len() | ||
| } | ||
|
|
||
| fn spawn(&mut self, id: BatchJobId, transactions: Vec<TransactionId>) { | ||
| self.0.spawn(async move { | ||
| todo!("Do actual work like aggregating transaction data"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the above comment: batch building could also take 3 - 5 seconds. Let's artificially simulate this for now.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if we should also simulate failures. So something like sleep a random period, and then randomly fail a batch every now and then. If we do want this, the failure rate should probably be a configurable parameter. Similar for the entire block. Though this might annoy users if this causes a user tx to expire. And could look bad ito throughput; though I don't think that's an issue right now (?).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is configurable and not too difficult to add, I don't mind doing it. |
||
| }); | ||
| } | ||
| } | ||
|
|
||
| impl BatchProducer { | ||
| pub async fn run(self) { | ||
| let mut interval = tokio::time::interval(self.batch_interval); | ||
| interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); | ||
|
|
||
| let mut inflight = WorkerPool(tokio::task::JoinSet::new()); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| _ = interval.tick() => { | ||
| if inflight.len() >= self.workers.get() { | ||
| tracing::info!("All batch workers occupied."); | ||
| continue; | ||
| } | ||
|
|
||
| // Transactions available? | ||
| let Some((batch_id, transactions)) = | ||
| self.mempool.lock().await.select_batch() | ||
| else { | ||
| tracing::info!("No transactions available for batch."); | ||
| continue; | ||
| }; | ||
|
|
||
| inflight.spawn(batch_id, transactions); | ||
| }, | ||
| result = inflight.join_next() => { | ||
| let mut mempool = self.mempool.lock().await; | ||
| match result { | ||
| Err(err) => { | ||
| tracing::warn!(%err, "Batch job panic'd.") | ||
| // TODO: somehow embed the batch ID into the join error, though this doesn't seem possible? | ||
| // mempool.batch_failed(batch_id); | ||
| }, | ||
| Ok(Err((batch_id, err))) => { | ||
| tracing::warn!(%batch_id, %err, "Batch job failed."); | ||
| mempool.batch_failed(batch_id); | ||
| }, | ||
| Ok(Ok(batch_id)) => { | ||
| mempool.batch_proved(batch_id); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just an example. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,12 +8,14 @@ use miden_objects::{ | |
| notes::{NoteHeader, Nullifier}, | ||
| transaction::InputNoteCommitment, | ||
| }; | ||
| use tokio::sync::Mutex; | ||
| use tracing::{debug, info, instrument}; | ||
|
|
||
| use crate::{ | ||
| batch_builder::batch::TransactionBatch, | ||
| errors::BuildBlockError, | ||
| store::{ApplyBlock, Store}, | ||
| mempool::{BatchJobId, Mempool}, | ||
| store::{ApplyBlock, DefaultStore, Store}, | ||
| COMPONENT, | ||
| }; | ||
|
|
||
|
|
@@ -143,3 +145,33 @@ where | |
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| struct BlockProducer { | ||
| pub mempool: Arc<Mutex<Mempool>>, | ||
| pub block_interval: tokio::time::Duration, | ||
| } | ||
|
|
||
| impl BlockProducer { | ||
| pub async fn run(self) { | ||
| let mut interval = tokio::time::interval(self.block_interval); | ||
| interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); | ||
|
|
||
| loop { | ||
| interval.tick().await; | ||
|
|
||
| let (block_number, batches) = self.mempool.lock().await.select_block(); | ||
|
|
||
| let result = self.build_and_commit_block(batches).await; | ||
| let mut mempool = self.mempool.lock().await; | ||
|
|
||
| match result { | ||
| Ok(_) => mempool.block_committed(block_number), | ||
| Err(_) => mempool.block_failed(block_number), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn build_and_commit_block(&self, batches: BTreeSet<BatchJobId>) -> Result<(), ()> { | ||
| todo!("Aggregate, prove and commit block"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will contain building the block and saving it to the store, right? For block building, since we don't actually prove anything yet, I am thinking we could put in an artificial delay of 3 - 5 seconds to get the conditions close to the real thing.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. I was also considering injecting failures randomly as mentioned here. |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an example; more scrutiny should be applied when we fill in the details.