Skip to content

Commit cdbfbe9

Browse files
MarcosNicolaumpauluccifmoletta
authored
feat(l1): process blocks in batches when syncing and importing (#2174)
**Motivation** Accelerate syncing! **Description** This PR introduces block batching during full sync: 1. Instead of storing and computing the state root for each block individually, we now maintain a single state tree for the entire batch, committing it only at the end. This results in one state trie per `n` blocks instead of one per block (we'll need less storage also). 2. The new full sync process: - Request 1024 headers - Request 1024 block bodies and collect them - Once all blocks are received, process them in batches using a single state trie, which is attached to the last block. 3. Blocks are now stored in a single transaction. 4. State root, receipts root, and request root validation are only required for the last block in the batch. 5. The new add_blocks_in_batch function includes a flag, `should_commit_intermediate_tries`. When set to true, it stores the tries for each block. This functionality is added to make the hive test pass. Currently, this is handled by verifying if the block is within the `STATE_TRIES_TO_KEEP` range. In a real syncing scenario, my intuition is that it would be better to wait until we are fully synced and then we would start storing the state of the new blocks and pruning when we reach `STATE_TRIES_TO_KEEP`. 6. Throughput when syncing is now measured per batches. 7. A new command was added to import blocks in batch Considerations: 1. ~Optimize account updates: Instead of inserting updates into the state trie after each block execution, batch them at the end, merging repeated accounts to reduce insertions and improve performance (see #2216)~ Closes #2216. 2. Improve transaction handling: Avoid committing storage tries to the database separately. Instead, create a single transaction for storing receipts, storage tries, and blocks. This would require additional abstractions for transaction management (see #2217). 3. This isn't working for `levm` backend we need it to cache the executions state and persist it between them, as we don't store anything until the final of the batch (see #2218). 4. In #2210 a new ci is added to run a bench comparing main and `head` branch using `import-in-batch` Closes None --------- Co-authored-by: Martin Paulucci <martin.c.paulucci@gmail.com> Co-authored-by: fmoletta <99273364+fmoletta@users.noreply.github.com>
1 parent 4c00221 commit cdbfbe9

12 files changed

Lines changed: 598 additions & 139 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
## Perf
44

55
#### 2025-03-30
6-
* Faster block import, use a slice instead of copy
7-
[#2097](https://github.com/lambdaclass/ethrex/pull/2097)
86

7+
- Faster block import, use a slice instead of copy
8+
[#2097](https://github.com/lambdaclass/ethrex/pull/2097)
99

1010
#### 2025-02-28
1111

12-
* Don't recompute transaction senders when building blocks [#2097](https://github.com/lambdaclass/ethrex/pull/2097)
12+
- Don't recompute transaction senders when building blocks [#2097](https://github.com/lambdaclass/ethrex/pull/2097)
13+
14+
#### 2025-03-21
15+
16+
- Process blocks in batches when syncing and importing [#2174](https://github.com/lambdaclass/ethrex/pull/2174)

crates/blockchain/blockchain.rs

Lines changed: 179 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ use ethrex_common::types::{
1818
BlockHeader, BlockNumber, ChainConfig, EIP4844Transaction, Receipt, Transaction,
1919
};
2020

21-
use ethrex_common::{Address, H256};
21+
use ethrex_common::{Address, H160, H256};
2222
use mempool::Mempool;
23+
use std::collections::HashMap;
2324
use std::{ops::Div, time::Instant};
2425

2526
use ethrex_storage::error::StoreError;
26-
use ethrex_storage::Store;
27+
use ethrex_storage::{AccountUpdate, Store};
2728
use ethrex_vm::{BlockExecutionResult, Evm, EvmEngine};
2829
use fork_choice::apply_fork_choice;
2930
use tracing::{error, info, warn};
@@ -38,6 +39,12 @@ pub struct Blockchain {
3839
pub mempool: Mempool,
3940
}
4041

42+
#[derive(Debug, Clone)]
43+
pub struct BatchBlockProcessingFailure {
44+
pub last_valid_hash: H256,
45+
pub failed_block_hash: H256,
46+
}
47+
4148
impl Blockchain {
4249
pub fn new(evm_engine: EvmEngine, store: Store) -> Self {
4350
Self {
@@ -55,7 +62,8 @@ impl Blockchain {
5562
}
5663
}
5764

58-
pub fn execute_block(&self, block: &Block) -> Result<BlockExecutionResult, ChainError> {
65+
/// Executes a block withing a new vm instance and state
66+
fn execute_block(&self, block: &Block) -> Result<BlockExecutionResult, ChainError> {
5967
// Validate if it can be the new head and find the parent
6068
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
6169
// If the parent is not present, we store it as pending.
@@ -74,45 +82,55 @@ impl Blockchain {
7482
);
7583
let execution_result = vm.execute_block(block)?;
7684

85+
// Validate execution went alright
86+
validate_gas_used(&execution_result.receipts, &block.header)?;
87+
validate_receipts_root(&block.header, &execution_result.receipts)?;
88+
validate_requests_hash(&block.header, &chain_config, &execution_result.requests)?;
89+
7790
Ok(execution_result)
7891
}
7992

80-
pub fn store_block(
93+
/// Executes a block from a given vm instance an does not clear its state
94+
fn execute_block_from_state(
8195
&self,
96+
parent_header: &BlockHeader,
8297
block: &Block,
83-
execution_result: BlockExecutionResult,
84-
) -> Result<(), ChainError> {
85-
// Assumes block is valid
86-
let BlockExecutionResult {
87-
receipts,
88-
requests,
89-
account_updates,
90-
} = execution_result;
91-
let chain_config = self.storage.get_chain_config()?;
98+
chain_config: &ChainConfig,
99+
vm: &mut Evm,
100+
) -> Result<BlockExecutionResult, ChainError> {
101+
// Validate the block pre-execution
102+
validate_block(block, parent_header, chain_config)?;
92103

93-
let block_hash = block.header.compute_block_hash();
104+
let execution_result = vm.execute_block_without_clearing_state(block)?;
94105

95-
validate_gas_used(&receipts, &block.header)?;
106+
// Validate execution went alright
107+
validate_gas_used(&execution_result.receipts, &block.header)?;
108+
validate_receipts_root(&block.header, &execution_result.receipts)?;
109+
validate_requests_hash(&block.header, chain_config, &execution_result.requests)?;
96110

111+
Ok(execution_result)
112+
}
113+
114+
pub fn store_block(
115+
&self,
116+
block: &Block,
117+
execution_result: BlockExecutionResult,
118+
) -> Result<(), ChainError> {
97119
// Apply the account updates over the last block's state and compute the new state root
98120
let new_state_root = self
99121
.storage
100-
.apply_account_updates(block.header.parent_hash, &account_updates)?
122+
.apply_account_updates(block.header.parent_hash, &execution_result.account_updates)?
101123
.ok_or(ChainError::ParentStateNotFound)?;
102124

103125
// Check state root matches the one in block header
104126
validate_state_root(&block.header, new_state_root)?;
105127

106-
// Check receipts root matches the one in block header
107-
validate_receipts_root(&block.header, &receipts)?;
108-
109-
// Processes requests from receipts, computes the requests_hash and compares it against the header
110-
validate_requests_hash(&block.header, &chain_config, &requests)?;
111-
112-
store_block(&self.storage, block.clone())?;
113-
store_receipts(&self.storage, receipts, block_hash)?;
114-
115-
Ok(())
128+
self.storage
129+
.add_block(block.clone())
130+
.map_err(ChainError::StoreError)?;
131+
self.storage
132+
.add_receipts(block.hash(), execution_result.receipts)
133+
.map_err(ChainError::StoreError)
116134
}
117135

118136
pub fn add_block(&self, block: &Block) -> Result<(), ChainError> {
@@ -132,6 +150,141 @@ impl Blockchain {
132150
result
133151
}
134152

153+
/// Adds multiple blocks in a batch.
154+
///
155+
/// If an error occurs, returns a tuple containing:
156+
/// - The error type ([`ChainError`]).
157+
/// - [`BatchProcessingFailure`] (if the error was caused by block processing).
158+
///
159+
/// Note: only the last block's state trie is stored in the db
160+
pub fn add_blocks_in_batch(
161+
&self,
162+
blocks: &[Block],
163+
) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
164+
let mut last_valid_hash = H256::default();
165+
166+
let Some(first_block_header) = blocks.first().map(|e| e.header.clone()) else {
167+
return Err((ChainError::Custom("First block not found".into()), None));
168+
};
169+
170+
let chain_config: ChainConfig = self
171+
.storage
172+
.get_chain_config()
173+
.map_err(|e| (e.into(), None))?;
174+
let mut vm = Evm::new(
175+
self.evm_engine,
176+
self.storage.clone(),
177+
first_block_header.parent_hash,
178+
);
179+
180+
let blocks_len = blocks.len();
181+
let mut all_receipts: HashMap<BlockHash, Vec<Receipt>> = HashMap::new();
182+
let mut all_account_updates: HashMap<H160, AccountUpdate> = HashMap::new();
183+
let mut total_gas_used = 0;
184+
let mut transactions_count = 0;
185+
186+
let interval = Instant::now();
187+
for (i, block) in blocks.iter().enumerate() {
188+
// for the first block, we need to query the store
189+
let parent_header = if i == 0 {
190+
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
191+
return Err((
192+
ChainError::ParentNotFound,
193+
Some(BatchBlockProcessingFailure {
194+
failed_block_hash: block.hash(),
195+
last_valid_hash,
196+
}),
197+
));
198+
};
199+
parent_header
200+
} else {
201+
// for the subsequent ones, the parent is the previous block
202+
blocks[i - 1].header.clone()
203+
};
204+
205+
let BlockExecutionResult {
206+
receipts,
207+
account_updates,
208+
..
209+
} = match self.execute_block_from_state(&parent_header, block, &chain_config, &mut vm) {
210+
Ok(result) => result,
211+
Err(err) => {
212+
return Err((
213+
err,
214+
Some(BatchBlockProcessingFailure {
215+
failed_block_hash: block.hash(),
216+
last_valid_hash,
217+
}),
218+
))
219+
}
220+
};
221+
222+
// Merge account updates
223+
for account_update in account_updates {
224+
let Some(cache) = all_account_updates.get_mut(&account_update.address) else {
225+
all_account_updates.insert(account_update.address, account_update);
226+
continue;
227+
};
228+
229+
cache.removed = account_update.removed;
230+
if let Some(code) = account_update.code {
231+
cache.code = Some(code);
232+
};
233+
234+
if let Some(info) = account_update.info {
235+
cache.info = Some(info);
236+
}
237+
238+
for (k, v) in account_update.added_storage.into_iter() {
239+
cache.added_storage.insert(k, v);
240+
}
241+
}
242+
243+
last_valid_hash = block.hash();
244+
total_gas_used += block.header.gas_used;
245+
transactions_count += block.body.transactions.len();
246+
all_receipts.insert(block.hash(), receipts);
247+
}
248+
249+
let Some(last_block) = blocks.last() else {
250+
return Err((ChainError::Custom("Last block not found".into()), None));
251+
};
252+
253+
// Apply the account updates over all blocks and compute the new state root
254+
let new_state_root = self
255+
.storage
256+
.apply_account_updates(
257+
first_block_header.parent_hash,
258+
&all_account_updates.into_values().collect::<Vec<_>>(),
259+
)
260+
.map_err(|e| (e.into(), None))?
261+
.ok_or((ChainError::ParentStateNotFound, None))?;
262+
263+
// Check state root matches the one in block header
264+
validate_state_root(&last_block.header, new_state_root).map_err(|e| (e, None))?;
265+
266+
self.storage
267+
.add_blocks(blocks)
268+
.map_err(|e| (e.into(), None))?;
269+
self.storage
270+
.add_receipts_for_blocks(all_receipts)
271+
.map_err(|e| (e.into(), None))?;
272+
273+
let elapsed_total = interval.elapsed().as_millis();
274+
let mut throughput = 0.0;
275+
if elapsed_total != 0 && total_gas_used != 0 {
276+
let as_gigas = (total_gas_used as f64).div(10_f64.powf(9_f64));
277+
throughput = (as_gigas) / (elapsed_total as f64) * 1000_f64;
278+
}
279+
280+
info!(
281+
"[METRICS] Executed and stored: Range: {}, Total transactions: {}, Throughput: {} Gigagas/s",
282+
blocks_len, transactions_count, throughput
283+
);
284+
285+
Ok(())
286+
}
287+
135288
//TODO: Forkchoice Update shouldn't be part of this function
136289
pub fn import_blocks(&self, blocks: &Vec<Block>) {
137290
let size = blocks.len();
@@ -364,21 +517,6 @@ pub fn validate_requests_hash(
364517
Ok(())
365518
}
366519

367-
/// Stores block and header in the database
368-
pub fn store_block(storage: &Store, block: Block) -> Result<(), ChainError> {
369-
storage.add_block(block)?;
370-
Ok(())
371-
}
372-
373-
pub fn store_receipts(
374-
storage: &Store,
375-
receipts: Vec<Receipt>,
376-
block_hash: BlockHash,
377-
) -> Result<(), ChainError> {
378-
storage.add_receipts(block_hash, receipts)?;
379-
Ok(())
380-
}
381-
382520
/// Performs post-execution checks
383521
pub fn validate_state_root(
384522
block_header: &BlockHeader,

crates/common/trie/trie.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,7 @@ impl Trie {
125125
/// Returns keccak(RLP_NULL) if the trie is empty
126126
/// Also commits changes to the DB
127127
pub fn hash(&mut self) -> Result<H256, TrieError> {
128-
if let Some(ref root) = self.root {
129-
self.state.commit(root)?;
130-
}
128+
self.commit()?;
131129
Ok(self
132130
.root
133131
.as_ref()
@@ -144,6 +142,13 @@ impl Trie {
144142
.unwrap_or(*EMPTY_TRIE_HASH)
145143
}
146144

145+
pub fn commit(&mut self) -> Result<(), TrieError> {
146+
if let Some(ref root) = self.root {
147+
self.state.commit(root)?;
148+
}
149+
Ok(())
150+
}
151+
147152
/// Obtain a merkle proof for the given path.
148153
/// The proof will contain all the encoded nodes traversed until reaching the node where the path is stored (including this last node).
149154
/// The proof will still be constructed even if the path is not stored in the trie, proving its absence.

crates/networking/p2p/peer_handler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use crate::{
1414
kademlia::{KademliaTable, PeerChannels},
1515
rlpx::{
1616
eth::{
17-
blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders},
17+
blocks::{
18+
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
19+
},
1820
receipts::{GetReceipts, Receipts},
1921
},
2022
message::Message as RLPxMessage,
@@ -33,14 +35,13 @@ pub const REQUEST_RETRY_ATTEMPTS: usize = 5;
3335
pub const MAX_RESPONSE_BYTES: u64 = 512 * 1024;
3436
pub const HASH_MAX: H256 = H256([0xFF; 32]);
3537

36-
// Ask as much as 128 block bodies and 192 block headers per request
37-
// these magic numbers are not part of the protocol and are taken from geth, see:
38+
// Ask as much as 128 block bodies per request
39+
// this magic number is not part of the protocol and is taken from geth, see:
3840
// https://github.com/ethereum/go-ethereum/blob/2585776aabbd4ae9b00050403b42afb0cee968ec/eth/downloader/downloader.go#L42-L43
3941
//
4042
// Note: We noticed that while bigger values are supported
4143
// increasing them may be the cause of peers disconnection
4244
pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128;
43-
pub const MAX_BLOCK_HEADERS_TO_REQUEST: usize = 192;
4445

4546
/// An abstraction over the [KademliaTable] containing logic to make requests to peers
4647
#[derive(Debug, Clone)]
@@ -83,14 +84,13 @@ impl PeerHandler {
8384
&self,
8485
start: H256,
8586
order: BlockRequestOrder,
86-
limit: u64,
8787
) -> Option<Vec<BlockHeader>> {
8888
for _ in 0..REQUEST_RETRY_ATTEMPTS {
8989
let request_id = rand::random();
9090
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
9191
id: request_id,
9292
startblock: start.into(),
93-
limit,
93+
limit: BLOCK_HEADER_LIMIT,
9494
skip: 0,
9595
reverse: matches!(order, BlockRequestOrder::NewToOld),
9696
});

0 commit comments

Comments
 (0)