Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions based/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions based/crates/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
reth_optimism_consensus = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", package = "reth-optimism-consensus" }
2 changes: 1 addition & 1 deletion based/crates/reth/src/api/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ where
if self.use_unsealed_state(&number) &&
let Some(unsealed_block) = self.unsealed_block.load_full()
{
Ok(Some(unsealed_block.to_block(full)))
Ok(Some(unsealed_block.to_rpc_block(full)))
} else {
EthBlocks::rpc_block(&self.canonical, number.into(), full).await.map_err(Into::into)
}
Expand Down
25 changes: 14 additions & 11 deletions based/crates/reth/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::{sync::Arc, time::Instant};

use alloy_consensus::Header;
use alloy_primitives::B256;
use alloy_rpc_types::Block;
use arc_swap::ArcSwapOption;
use bop_common::{
p2p::{EnvV0, FragV0, SealV0},
typedefs::OpBlock,
};
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_optimism_chainspec::OpHardforks;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_storage_api::{
BlockReaderIdExt, BlockWriter, CanonChainTracker, DatabaseProviderFactory, StateProviderFactory,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info};

Expand All @@ -23,6 +24,7 @@ use crate::{
/// Result of submitting a frag to the driver.
#[derive(Debug, Clone, Copy)]
pub enum FragStatus {
Ignored,
Valid,
Invalid,
}
Expand Down Expand Up @@ -99,9 +101,12 @@ impl Driver {
where
Client: StateProviderFactory
+ ChainSpecProvider<ChainSpec: EthChainSpec<Header = Header> + OpHardforks>
+ BlockReaderIdExt<Header = Header>
+ BlockReaderIdExt<Header = Header, Block = reth_optimism_primitives::OpBlock>
+ CanonChainTracker<Header = Header>
+ DatabaseProviderFactory
+ Clone
+ 'static,
<Client as DatabaseProviderFactory>::ProviderRW: BlockWriter<Block = reth_optimism_primitives::OpBlock>,
{
let executor = StateExecutor::new(client);
let current_unsealed_block = executor.shared_unsealed_block();
Expand Down Expand Up @@ -244,7 +249,7 @@ impl<E: UnsealedExecutor> DriverInner<E> {

if frag.block_number < ub.env.number {
info!(frag_block = frag.block_number, env_number = ub.env.number, "stale frag (older block), ignoring");
return Ok(FragStatus::Valid);
return Ok(FragStatus::Ignored);
}

if let Err(e) = ub.validate_new_frag(&frag) {
Expand All @@ -266,7 +271,7 @@ impl<E: UnsealedExecutor> DriverInner<E> {

if ub.last_frag().is_some_and(|f| f.is_last) {
info!("last frag received, pre-sealing block");
if let Err(e) = self.exec.seal().await {
if let Err(e) = self.exec.seal() {
error!(error = %e, "seal failed, discarding unsealed block");
self.reset_current_unsealed_block();
return Err(DriverError::from(e));
Expand All @@ -287,9 +292,7 @@ impl<E: UnsealedExecutor> DriverInner<E> {
return Ok(());
}

let presealed_block = self.exec.get_block(seal.block_hash, seal.block_number).await;

let presealed_block = match presealed_block {
let presealed_block = match self.exec.get_block(seal.block_hash) {
Ok(b) => b,
Err(e) => {
self.reset_current_unsealed_block();
Expand All @@ -299,7 +302,7 @@ impl<E: UnsealedExecutor> DriverInner<E> {

self.validate_seal_frag_v0(&presealed_block, ub.as_ref(), &seal)?;

self.exec.set_canonical(&presealed_block).await?;
self.exec.set_canonical(&presealed_block)?;

self.reset_current_unsealed_block();

Expand All @@ -309,11 +312,11 @@ impl<E: UnsealedExecutor> DriverInner<E> {

fn validate_seal_frag_v0(
&self,
presealed_block: &Block,
presealed_block: &OpBlock,
ub: &UnsealedBlock,
seal: &SealV0,
) -> Result<(), ValidateSealError> {
let expected_block_hash: B256 = presealed_block.header.hash.into();
let expected_block_hash: B256 = presealed_block.header.hash_slow();
if expected_block_hash != seal.block_hash {
return Err(ValidateSealError::BlockHash { expected: expected_block_hash, got: seal.block_hash });
}
Expand Down
6 changes: 6 additions & 0 deletions based/crates/reth/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub enum UnsealedBlockError {

#[error("received frag after last frag already accepted")]
AlreadyEnded,

#[error("operation failed: {0}")]
Failed(String),
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -113,4 +116,7 @@ pub enum ExecError {

#[error(transparent)]
OpEthApi(#[from] OpEthApiError),

#[error(transparent)]
UnsealedBlock(#[from] UnsealedBlockError),
}
111 changes: 88 additions & 23 deletions based/crates/reth/src/exec.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use std::{future::Future, sync::Arc};
use std::sync::{Arc, Mutex};

use alloy_consensus::{
BlockBody, Header, Receipt, Transaction,
transaction::{Recovered, SignerRecoverable, TransactionMeta},
};
use alloy_eips::{BlockNumberOrTag, Typed2718, eip2718::Decodable2718};
use alloy_primitives::{B256, BlockNumber, Bytes, Sealable};
use alloy_rpc_types::{Block, Log};
use alloy_primitives::{B256, Bytes, Sealable};
use alloy_rpc_types::Log;
use arc_swap::ArcSwapOption;
use bop_common::{
p2p::{EnvV0, FragV0},
typedefs::Database,
};
use op_alloy_consensus::OpTxEnvelope;
use op_alloy_rpc_types::{OpTransactionReceipt, Transaction as RPCTransaction};
use reth::api::Block as RethBlock;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth::{
api::Block as RethBlock,
network::cache::LruMap,
primitives::{SealedBlock, SealedHeader},
};
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks as _};
use reth_evm::{ConfigureEvm, Evm, op_revm::OpHaltReason};
use reth_optimism_chainspec::OpHardforks;
use reth_optimism_consensus::isthmus;
use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt, OpTransactionSigned};
use reth_optimism_rpc::OpReceiptBuilder;
Expand All @@ -27,11 +32,15 @@ use reth_revm::{
database::StateProviderDatabase,
};
use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_storage_api::{
BlockReaderIdExt, BlockWriter, CanonChainTracker, DBProvider, DatabaseProviderFactory, StateProviderFactory,
};
use revm::database::CacheDB;

use crate::{error::ExecError, unsealed_block::UnsealedBlock};

const BLOCK_CACHE_LIMIT: u32 = 256;

/// This trait is the ONLY place that needs to know about Reth internals.
/// Everything else is just state-machine + bookkeeping.
pub trait UnsealedExecutor: Send {
Expand All @@ -43,11 +52,11 @@ pub trait UnsealedExecutor: Send {
/// MUST be cumulative: txs execute after all previous frags's txs.
fn execute_frag(&mut self, frag: &FragV0) -> Result<(), ExecError>;

fn seal(&mut self) -> impl Future<Output = Result<(), ExecError>> + Send + '_;
fn seal(&mut self) -> Result<(), ExecError>;

fn set_canonical(&mut self, b: &Block) -> impl Future<Output = Result<(), ExecError>> + Send + '_;
fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError>;

fn get_block(&self, hash: B256, number: BlockNumber) -> impl Future<Output = Result<Block, ExecError>> + Send + '_;
fn get_block(&self, hash: B256) -> Result<OpBlock, ExecError>;

/// Reset overlay state completely.
fn reset(&mut self);
Expand All @@ -56,11 +65,16 @@ pub trait UnsealedExecutor: Send {
pub struct StateExecutor<Client> {
client: Client,
current_unsealed_block: Arc<ArcSwapOption<UnsealedBlock>>,
block_cache: Mutex<LruMap<B256, OpBlock>>,
}

impl<Client> StateExecutor<Client> {
pub fn new(client: Client) -> Self {
Self { client, current_unsealed_block: Arc::new(ArcSwapOption::new(None)) }
Self {
client,
current_unsealed_block: Arc::new(ArcSwapOption::new(None)),
block_cache: Mutex::new(LruMap::new(BLOCK_CACHE_LIMIT)),
}
}

pub fn shared_unsealed_block(&self) -> Arc<ArcSwapOption<UnsealedBlock>> {
Expand All @@ -72,9 +86,12 @@ impl<Client> UnsealedExecutor for StateExecutor<Client>
where
Client: StateProviderFactory
+ ChainSpecProvider<ChainSpec: EthChainSpec<Header = Header> + OpHardforks>
+ BlockReaderIdExt<Header = Header>
+ BlockReaderIdExt<Header = Header, Block = OpBlock>
+ CanonChainTracker<Header = Header>
+ DatabaseProviderFactory
+ Clone
+ 'static,
<Client as DatabaseProviderFactory>::ProviderRW: BlockWriter<Block = OpBlock>,
{
fn ensure_env(&mut self, env: &EnvV0) -> Result<(), ExecError> {
let Some(parent) = self.client.block_by_hash(env.parent_hash)? else {
Expand Down Expand Up @@ -103,7 +120,11 @@ where
self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_header.number))?;
let state_provider_db = StateProviderDatabase::new(state_provider);
let state = State::builder().with_database(state_provider_db).with_bundle_update().build();
let ub = UnsealedBlock::new(env.clone()).with_db_cache(CacheDB::new(state).cache);

// Check if the current block is a prague block
let is_prague = self.client.chain_spec().is_prague_active_at_timestamp(env.timestamp);

let ub = UnsealedBlock::new(env.clone(), is_prague).with_db_cache(CacheDB::new(state).cache);
self.current_unsealed_block.store(Some(Arc::new(ub)));

Ok(())
Expand Down Expand Up @@ -273,7 +294,10 @@ where
}

db = evm.into_db();
ub = ub.with_db_cache(db.cache).with_state_overrides(Some(state_overrides));
ub = ub
.with_db_cache(db.cache)
.with_state_overrides(Some(state_overrides))
.with_bundle_state(db.db.bundle_state);

ub.accept_frag_execution(frag, logs, receipts, gas_used);

Expand All @@ -282,20 +306,61 @@ where
Ok(())
}

fn seal(&mut self) -> impl Future<Output = Result<(), ExecError>> + Send + '_ {
async move { Ok(()) }
fn seal(&mut self) -> Result<(), ExecError> {
let ub = self.current_unsealed_block.load_full().ok_or(ExecError::NotInitialized)?;
let withdrawals_hash = if ub.is_prague {
let canonical_block = ub.env.number.saturating_sub(1);

let state_provider =
self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block)).map_err(|e| {
ExecError::Failed(format!("state_by_block_number_or_tag({canonical_block}) failed: {e}"))
})?;
let bundle_state = ub.get_bundle_state();
Some(isthmus::withdrawals_root(bundle_state, state_provider)?)
} else {
None
};

let block = ub.to_op_block(withdrawals_hash)?;
let sealed = SealedBlock::seal_slow(block);
let recovered = sealed.try_recover().map_err(|e| ExecError::Failed(format!("recover senders: {e}")))?;

let provider_rw = self.client.database_provider_rw()?;
provider_rw.insert_block(recovered)?;
provider_rw.commit()?;
Ok(())
}

fn set_canonical(&mut self, _b: &Block) -> impl Future<Output = Result<(), ExecError>> + Send + '_ {
async move { Ok(()) }
fn set_canonical(&mut self, b: &OpBlock) -> Result<(), ExecError> {
let sealed = SealedHeader::seal_slow(b.header.clone());
self.client.set_canonical_head(sealed);
Ok(())
}

fn get_block(
&self,
_hash: B256,
_number: BlockNumber,
) -> impl Future<Output = Result<Block, ExecError>> + Send + '_ {
async move { Ok(Block::default()) }
fn get_block(&self, hash: B256) -> Result<OpBlock, ExecError> {
if let Some(block) = self
.block_cache
.lock()
.map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))?
.get(&hash)
.cloned()
{
return Ok(block);
}

// fetch
let block = self
.client
.block_by_hash(hash)
.map_err(|e| ExecError::Failed(format!("block_by_hash failed: {e}")))?
.ok_or_else(|| ExecError::Failed("pre-sealed block not found".into()))?;

self.block_cache
.lock()
.map_err(|_| ExecError::Failed("block_cache mutex poisoned".into()))?
.insert(hash, block.clone());

Ok(block)
}

fn reset(&mut self) {
Expand Down
Loading
Loading