Skip to content

Commit 3d98502

Browse files
authored
refactor(reaping): use cache for seen hashes instead of in memory store (#2811)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Use cache instead of in memory store for reaper <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 2abf5a4 commit 3d98502

File tree

14 files changed

+442
-111
lines changed

14 files changed

+442
-111
lines changed

CHANGELOG.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# Changelog
32

43
<!--
@@ -10,9 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
109

1110
## [Unreleased]
1211

12+
### Changed
13+
14+
- Use cache instead of in memory store for reaper. Persist cache on reload. Autoclean after 24 hours. ([#2811](https://github.com/evstack/ev-node/pull/2811))
15+
16+
## v1.0.0-beta.9
17+
1318
### Added
1419

1520
<!-- New features or capabilities -->
21+
1622
- Added automated upgrade test for the `evm-single` app that verifies compatibility when moving from v1.0.0-beta.8 to HEAD in CI ([#2780](https://github.com/evstack/ev-node/pull/2780))
1723
- Added execution-layer replay mechanism so nodes can resynchronize by replaying missed batches against the executor ([#2771](https://github.com/evstack/ev-node/pull/2771))
1824
- Added cache-pruning logic that evicts entries once heights are finalized to keep node memory usage bounded ([#2761](https://github.com/evstack/ev-node/pull/2761))
@@ -24,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2430
### Changed
2531

2632
<!-- Changes to existing functionality -->
33+
2734
- Hardened signer CLI and block pipeline per security audit: passphrases must be provided via `--evnode.signer.passphrase_file`, JWT secrets must be provided via `--evm.jwt-secret-file`, data/header validation enforces metadata and timestamp checks, and the reaper backs off on failures (BREAKING) ([#2764](https://github.com/evstack/ev-node/pull/2764))
2835
- Added retries around executor `ExecuteTxs` calls to better tolerate transient execution errors ([#2784](https://github.com/evstack/ev-node/pull/2784))
2936
- Increased default `ReadinessMaxBlocksBehind` from 3 to 30 blocks so `/health/ready` stays true during normal batch sync ([#2779](https://github.com/evstack/ev-node/pull/2779))
@@ -32,11 +39,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3239
### Deprecated
3340

3441
<!-- Features that will be removed in future versions -->
35-
-
3642

3743
### Removed
3844

3945
<!-- Features that were removed -->
46+
4047
- Removed `LastCommitHash`, `ConsensusHash`, and `LastResultsHash` from the canonical header representation in favor of slim headers (BREAKING; legacy hashes now live under `Header.Legacy`) ([#2766](https://github.com/evstack/ev-node/pull/2766))
4148

4249
### Fixed
@@ -46,7 +53,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4653
### Security
4754

4855
<!-- Security vulnerability fixes -->
49-
-
5056

5157
<!--
5258
## Category Guidelines:
@@ -117,4 +123,4 @@ Pre-release versions: 0.x.y (anything may change)
117123
-->
118124

119125
<!-- Links -->
120-
[Unreleased]: https://github.com/evstack/ev-node/compare/v1.0.0-beta.1...HEAD
126+
- [Unreleased]: https://github.com/evstack/ev-node/compare/v1.0.0-beta.1...HEAD

block/components.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func NewAggregatorComponents(
236236
genesis,
237237
logger,
238238
executor,
239+
cacheManager,
239240
reaping.DefaultInterval,
240241
)
241242
if err != nil {

block/internal/cache/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ ok github.com/evstack/ev-node/block/internal/cache 25.834s
2828
*/
2929

3030
func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Store {
31-
ds, err := store.NewDefaultInMemoryKVStore()
31+
ds, err := store.NewTestInMemoryKVStore()
3232
if err != nil {
3333
b.Fatal(err)
3434
}

block/internal/cache/manager.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"path/filepath"
99
"sync"
10+
"time"
1011

1112
"github.com/rs/zerolog"
1213

@@ -21,6 +22,10 @@ var (
2122
headerCacheDir = filepath.Join(cacheDir, "header")
2223
dataCacheDir = filepath.Join(cacheDir, "data")
2324
pendingEventsCacheDir = filepath.Join(cacheDir, "pending_da_events")
25+
txCacheDir = filepath.Join(cacheDir, "tx")
26+
27+
// DefaultTxCacheRetention is the default time to keep transaction hashes in cache
28+
DefaultTxCacheRetention = 24 * time.Hour
2429
)
2530

2631
// gobRegisterOnce ensures gob type registration happens exactly once process-wide.
@@ -51,6 +56,11 @@ type Manager interface {
5156
GetDataDAIncluded(hash string) (uint64, bool)
5257
SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64)
5358

59+
// Transaction operations
60+
IsTxSeen(hash string) bool
61+
SetTxSeen(hash string)
62+
CleanupOldTxs(olderThan time.Duration) int
63+
5464
// Pending operations
5565
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
5666
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
@@ -78,6 +88,8 @@ var _ Manager = (*implementation)(nil)
7888
type implementation struct {
7989
headerCache *Cache[types.SignedHeader]
8090
dataCache *Cache[types.Data]
91+
txCache *Cache[struct{}]
92+
txTimestamps *sync.Map // map[string]time.Time - tracks when each tx was seen
8193
pendingEventsCache *Cache[common.DAHeightEvent]
8294
pendingHeaders *PendingHeaders
8395
pendingData *PendingData
@@ -90,6 +102,7 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma
90102
// Initialize caches
91103
headerCache := NewCache[types.SignedHeader]()
92104
dataCache := NewCache[types.Data]()
105+
txCache := NewCache[struct{}]()
93106
pendingEventsCache := NewCache[common.DAHeightEvent]()
94107

95108
// Initialize pending managers
@@ -107,6 +120,8 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma
107120
impl := &implementation{
108121
headerCache: headerCache,
109122
dataCache: dataCache,
123+
txCache: txCache,
124+
txTimestamps: new(sync.Map),
110125
pendingEventsCache: pendingEventsCache,
111126
pendingHeaders: pendingHeaders,
112127
pendingData: pendingData,
@@ -167,12 +182,71 @@ func (m *implementation) SetDataDAIncluded(hash string, daHeight uint64, blockHe
167182
m.dataCache.setDAIncluded(hash, daHeight, blockHeight)
168183
}
169184

185+
// Transaction operations
186+
func (m *implementation) IsTxSeen(hash string) bool {
187+
return m.txCache.isSeen(hash)
188+
}
189+
190+
func (m *implementation) SetTxSeen(hash string) {
191+
// Use 0 as height since transactions don't have a block height yet
192+
m.txCache.setSeen(hash, 0)
193+
// Track timestamp for cleanup purposes
194+
m.txTimestamps.Store(hash, time.Now())
195+
}
196+
197+
// CleanupOldTxs removes transaction hashes older than the specified duration.
198+
// Returns the number of transactions removed.
199+
// This prevents unbounded growth of the transaction cache.
200+
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
201+
if olderThan <= 0 {
202+
olderThan = DefaultTxCacheRetention
203+
}
204+
205+
cutoff := time.Now().Add(-olderThan)
206+
removed := 0
207+
208+
m.txTimestamps.Range(func(key, value any) bool {
209+
hash, ok := key.(string)
210+
if !ok {
211+
return true
212+
}
213+
214+
timestamp, ok := value.(time.Time)
215+
if !ok {
216+
return true
217+
}
218+
219+
if timestamp.Before(cutoff) {
220+
// Remove from both caches
221+
m.txCache.hashes.Delete(hash)
222+
m.txTimestamps.Delete(hash)
223+
removed++
224+
}
225+
return true
226+
})
227+
228+
if removed > 0 {
229+
m.logger.Debug().
230+
Int("removed", removed).
231+
Dur("older_than", olderThan).
232+
Msg("cleaned up old transaction hashes from cache")
233+
}
234+
235+
return removed
236+
}
237+
170238
// DeleteHeight removes from all caches the given height.
171239
// This can be done when a height has been da included.
172240
func (m *implementation) DeleteHeight(blockHeight uint64) {
173241
m.headerCache.deleteAllForHeight(blockHeight)
174242
m.dataCache.deleteAllForHeight(blockHeight)
175243
m.pendingEventsCache.deleteAllForHeight(blockHeight)
244+
245+
// Note: txCache is intentionally NOT deleted here because:
246+
// 1. Transactions are tracked by hash, not by block height (they use height 0)
247+
// 2. A transaction seen at one height may be resubmitted at a different height
248+
// 3. The cache prevents duplicate submissions across block heights
249+
// 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height
176250
}
177251

178252
// Pending operations
@@ -246,10 +320,18 @@ func (m *implementation) SaveToDisk() error {
246320
return fmt.Errorf("failed to save data cache to disk: %w", err)
247321
}
248322

323+
if err := m.txCache.SaveToDisk(filepath.Join(cfgDir, txCacheDir)); err != nil {
324+
return fmt.Errorf("failed to save tx cache to disk: %w", err)
325+
}
326+
249327
if err := m.pendingEventsCache.SaveToDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
250328
return fmt.Errorf("failed to save pending events cache to disk: %w", err)
251329
}
252330

331+
// Note: txTimestamps are not persisted to disk intentionally.
332+
// On restart, all cached transactions will be treated as "new" for cleanup purposes,
333+
// which is acceptable as they will be cleaned up on the next cleanup cycle if old enough.
334+
253335
return nil
254336
}
255337

@@ -267,10 +349,24 @@ func (m *implementation) LoadFromDisk() error {
267349
return fmt.Errorf("failed to load data cache from disk: %w", err)
268350
}
269351

352+
if err := m.txCache.LoadFromDisk(filepath.Join(cfgDir, txCacheDir)); err != nil {
353+
return fmt.Errorf("failed to load tx cache from disk: %w", err)
354+
}
355+
270356
if err := m.pendingEventsCache.LoadFromDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
271357
return fmt.Errorf("failed to load pending events cache from disk: %w", err)
272358
}
273359

360+
// After loading tx cache from disk, initialize timestamps for loaded transactions
361+
// Set them to current time so they won't be immediately cleaned up
362+
now := time.Now()
363+
m.txCache.hashes.Range(func(key, value any) bool {
364+
if hash, ok := key.(string); ok {
365+
m.txTimestamps.Store(hash, now)
366+
}
367+
return true
368+
})
369+
274370
return nil
275371
}
276372

0 commit comments

Comments
 (0)