diff --git a/go.mod b/go.mod index 7ee1cc1047..cecbeef630 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ retract v0.12.0 // Published by accident require ( github.com/celestiaorg/utils v0.1.0 github.com/cometbft/cometbft v0.38.15 - github.com/cosmos/gogoproto v1.7.0 + github.com/cosmos/gogoproto v1.7.0 // indirect github.com/go-kit/kit v0.13.0 github.com/gogo/protobuf v1.3.2 github.com/gorilla/websocket v1.5.3 // indirect diff --git a/mempool/cache.go b/mempool/cache.go deleted file mode 100644 index d29357871e..0000000000 --- a/mempool/cache.go +++ /dev/null @@ -1,121 +0,0 @@ -package mempool - -import ( - "container/list" - - "sync" - - "github.com/cometbft/cometbft/types" -) - -// TxCache defines an interface for raw transaction caching in a mempool. -// Currently, a TxCache does not allow direct reading or getting of transaction -// values. A TxCache is used primarily to push transactions and removing -// transactions. Pushing via Push returns a boolean telling the caller if the -// transaction already exists in the cache or not. -type TxCache interface { - // Reset resets the cache to an empty state. - Reset() - - // Push adds the given raw transaction to the cache and returns true if it was - // newly added. Otherwise, it returns false. - Push(tx types.Tx) bool - - // Remove removes the given raw transaction from the cache. - Remove(tx types.Tx) - - // Has reports whether tx is present in the cache. Checking for presence is - // not treated as an access of the value. - Has(tx types.Tx) bool -} - -var _ TxCache = (*LRUTxCache)(nil) - -// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache -// only stores the hash of the raw transaction. -type LRUTxCache struct { - mtx sync.Mutex - size int - cacheMap map[types.TxKey]*list.Element - list *list.List -} - -func NewLRUTxCache(cacheSize int) *LRUTxCache { - return &LRUTxCache{ - size: cacheSize, - cacheMap: make(map[types.TxKey]*list.Element, cacheSize), - list: list.New(), - } -} - -// GetList returns the underlying linked-list that backs the LRU cache. Note, -// this should be used for testing purposes only! -func (c *LRUTxCache) GetList() *list.List { - return c.list -} - -func (c *LRUTxCache) Reset() { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.cacheMap = make(map[types.TxKey]*list.Element, c.size) - c.list.Init() -} - -func (c *LRUTxCache) Push(tx types.Tx) bool { - c.mtx.Lock() - defer c.mtx.Unlock() - - key := tx.Key() - - moved, ok := c.cacheMap[key] - if ok { - c.list.MoveToBack(moved) - return false - } - - if c.list.Len() >= c.size { - front := c.list.Front() - if front != nil { - frontKey := front.Value.(types.TxKey) - delete(c.cacheMap, frontKey) - c.list.Remove(front) - } - } - - e := c.list.PushBack(key) - c.cacheMap[key] = e - - return true -} - -func (c *LRUTxCache) Remove(tx types.Tx) { - c.mtx.Lock() - defer c.mtx.Unlock() - - key := tx.Key() - e := c.cacheMap[key] - delete(c.cacheMap, key) - - if e != nil { - c.list.Remove(e) - } -} - -func (c *LRUTxCache) Has(tx types.Tx) bool { - c.mtx.Lock() - defer c.mtx.Unlock() - - _, ok := c.cacheMap[tx.Key()] - return ok -} - -// NopTxCache defines a no-op raw transaction cache. -type NopTxCache struct{} - -var _ TxCache = (*NopTxCache)(nil) - -func (NopTxCache) Reset() {} -func (NopTxCache) Push(types.Tx) bool { return true } -func (NopTxCache) Remove(types.Tx) {} -func (NopTxCache) Has(types.Tx) bool { return false } diff --git a/mempool/cache_bench_test.go b/mempool/cache_bench_test.go deleted file mode 100644 index 1c26999d10..0000000000 --- a/mempool/cache_bench_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package mempool - -import ( - "encoding/binary" - "testing" -) - -func BenchmarkCacheInsertTime(b *testing.B) { - cache := NewLRUTxCache(b.N) - - txs := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - txs[i] = make([]byte, 8) - binary.BigEndian.PutUint64(txs[i], uint64(i)) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - cache.Push(txs[i]) - } -} - -// This benchmark is probably skewed, since we actually will be removing -// txs in parallel, which may cause some overhead due to mutex locking. -func BenchmarkCacheRemoveTime(b *testing.B) { - cache := NewLRUTxCache(b.N) - - txs := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - txs[i] = make([]byte, 8) - binary.BigEndian.PutUint64(txs[i], uint64(i)) - cache.Push(txs[i]) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - cache.Remove(txs[i]) - } -} diff --git a/mempool/cache_test.go b/mempool/cache_test.go deleted file mode 100644 index 44b2beb014..0000000000 --- a/mempool/cache_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package mempool - -import ( - "crypto/rand" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestCacheRemove(t *testing.T) { - cache := NewLRUTxCache(100) - numTxs := 10 - - txs := make([][]byte, numTxs) - for i := 0; i < numTxs; i++ { - // probability of collision is 2**-256 - txBytes := make([]byte, 32) - _, err := rand.Read(txBytes) - require.NoError(t, err) - - txs[i] = txBytes - cache.Push(txBytes) - - // make sure its added to both the linked list and the map - require.Equal(t, i+1, len(cache.cacheMap)) - require.Equal(t, i+1, cache.list.Len()) - } - - for i := 0; i < numTxs; i++ { - cache.Remove(txs[i]) - // make sure its removed from both the map and the linked list - require.Equal(t, numTxs-(i+1), len(cache.cacheMap)) - require.Equal(t, numTxs-(i+1), cache.list.Len()) - } -} diff --git a/mempool/clist/bench_test.go b/mempool/clist/bench_test.go deleted file mode 100644 index 95973cc767..0000000000 --- a/mempool/clist/bench_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package clist - -import "testing" - -func BenchmarkDetaching(b *testing.B) { - lst := New() - for i := 0; i < b.N+1; i++ { - lst.PushBack(i) - } - start := lst.Front() - nxt := start.Next() - b.ResetTimer() - for i := 0; i < b.N; i++ { - start.removed = true - start.DetachNext() - start.DetachPrev() - tmp := nxt - nxt = nxt.Next() - start = tmp - } -} - -// This is used to benchmark the time of RMutex. -func BenchmarkRemoved(b *testing.B) { - lst := New() - for i := 0; i < b.N+1; i++ { - lst.PushBack(i) - } - start := lst.Front() - nxt := start.Next() - b.ResetTimer() - for i := 0; i < b.N; i++ { - start.Removed() - tmp := nxt - nxt = nxt.Next() - start = tmp - } -} - -func BenchmarkPushBack(b *testing.B) { - lst := New() - b.ResetTimer() - for i := 0; i < b.N; i++ { - lst.PushBack(i) - } -} diff --git a/mempool/clist/clist.go b/mempool/clist/clist.go deleted file mode 100644 index e565cc3099..0000000000 --- a/mempool/clist/clist.go +++ /dev/null @@ -1,405 +0,0 @@ -package clist - -/* - -The purpose of CList is to provide a goroutine-safe linked-list. -This list can be traversed concurrently by any number of goroutines. -However, removed CElements cannot be added back. -NOTE: Not all methods of container/list are (yet) implemented. -NOTE: Removed elements need to DetachPrev or DetachNext consistently -to ensure garbage collection of removed elements. - -*/ - -import ( - "fmt" - "sync" -) - -// MaxLength is the max allowed number of elements a linked list is -// allowed to contain. -// If more elements are pushed to the list it will panic. -const MaxLength = int(^uint(0) >> 1) - -/* -CElement is an element of a linked-list -Traversal from a CElement is goroutine-safe. - -We can't avoid using WaitGroups or for-loops given the documentation -spec without re-implementing the primitives that already exist in -golang/sync. Notice that WaitGroup allows many go-routines to be -simultaneously released, which is what we want. Mutex doesn't do -this. RWMutex does this, but it's clumsy to use in the way that a -WaitGroup would be used -- and we'd end up having two RWMutex's for -prev/next each, which is doubly confusing. - -sync.Cond would be sort-of useful, but we don't need a write-lock in -the for-loop. Use sync.Cond when you need serial access to the -"condition". In our case our condition is if `next != nil || removed`, -and there's no reason to serialize that condition for goroutines -waiting on NextWait() (since it's just a read operation). -*/ -type CElement struct { - mtx sync.RWMutex - prev *CElement - prevWg *sync.WaitGroup - prevWaitCh chan struct{} - next *CElement - nextWg *sync.WaitGroup - nextWaitCh chan struct{} - removed bool - - Value interface{} // immutable -} - -// Blocking implementation of Next(). -// May return nil iff CElement was tail and got removed. -func (e *CElement) NextWait() *CElement { - for { - e.mtx.RLock() - next := e.next - nextWg := e.nextWg - removed := e.removed - e.mtx.RUnlock() - - if next != nil || removed { - return next - } - - nextWg.Wait() - // e.next doesn't necessarily exist here. - // That's why we need to continue a for-loop. - } -} - -// Blocking implementation of Prev(). -// May return nil iff CElement was head and got removed. -func (e *CElement) PrevWait() *CElement { - for { - e.mtx.RLock() - prev := e.prev - prevWg := e.prevWg - removed := e.removed - e.mtx.RUnlock() - - if prev != nil || removed { - return prev - } - - prevWg.Wait() - } -} - -// PrevWaitChan can be used to wait until Prev becomes not nil. Once it does, -// channel will be closed. -func (e *CElement) PrevWaitChan() <-chan struct{} { - e.mtx.RLock() - defer e.mtx.RUnlock() - - return e.prevWaitCh -} - -// NextWaitChan can be used to wait until Next becomes not nil. Once it does, -// channel will be closed. -func (e *CElement) NextWaitChan() <-chan struct{} { - e.mtx.RLock() - defer e.mtx.RUnlock() - - return e.nextWaitCh -} - -// Nonblocking, may return nil if at the end. -func (e *CElement) Next() *CElement { - e.mtx.RLock() - val := e.next - e.mtx.RUnlock() - return val -} - -// Nonblocking, may return nil if at the end. -func (e *CElement) Prev() *CElement { - e.mtx.RLock() - prev := e.prev - e.mtx.RUnlock() - return prev -} - -func (e *CElement) Removed() bool { - e.mtx.RLock() - isRemoved := e.removed - e.mtx.RUnlock() - return isRemoved -} - -func (e *CElement) DetachNext() { - e.mtx.Lock() - if !e.removed { - e.mtx.Unlock() - panic("DetachNext() must be called after Remove(e)") - } - e.next = nil - e.mtx.Unlock() -} - -func (e *CElement) DetachPrev() { - e.mtx.Lock() - if !e.removed { - e.mtx.Unlock() - panic("DetachPrev() must be called after Remove(e)") - } - e.prev = nil - e.mtx.Unlock() -} - -// NOTE: This function needs to be safe for -// concurrent goroutines waiting on nextWg. -func (e *CElement) SetNext(newNext *CElement) { - e.mtx.Lock() - - oldNext := e.next - e.next = newNext - if oldNext != nil && newNext == nil { - // See https://golang.org/pkg/sync/: - // - // If a WaitGroup is reused to wait for several independent sets of - // events, new Add calls must happen after all previous Wait calls have - // returned. - e.nextWg = waitGroup1() // WaitGroups are difficult to re-use. - e.nextWaitCh = make(chan struct{}) - } - if oldNext == nil && newNext != nil { - e.nextWg.Done() - close(e.nextWaitCh) - } - e.mtx.Unlock() -} - -// NOTE: This function needs to be safe for -// concurrent goroutines waiting on prevWg -func (e *CElement) SetPrev(newPrev *CElement) { - e.mtx.Lock() - - oldPrev := e.prev - e.prev = newPrev - if oldPrev != nil && newPrev == nil { - e.prevWg = waitGroup1() // WaitGroups are difficult to re-use. - e.prevWaitCh = make(chan struct{}) - } - if oldPrev == nil && newPrev != nil { - e.prevWg.Done() - close(e.prevWaitCh) - } - e.mtx.Unlock() -} - -func (e *CElement) SetRemoved() { - e.mtx.Lock() - - e.removed = true - - // This wakes up anyone waiting in either direction. - if e.prev == nil { - e.prevWg.Done() - close(e.prevWaitCh) - } - if e.next == nil { - e.nextWg.Done() - close(e.nextWaitCh) - } - e.mtx.Unlock() -} - -//-------------------------------------------------------------------------------- - -// CList represents a linked list. -// The zero value for CList is an empty list ready to use. -// Operations are goroutine-safe. -// Panics if length grows beyond the max. -type CList struct { - mtx sync.RWMutex - wg *sync.WaitGroup - waitCh chan struct{} - head *CElement // first element - tail *CElement // last element - len int // list length - maxLen int // max list length -} - -func (l *CList) Init() *CList { - l.mtx.Lock() - - l.wg = waitGroup1() - l.waitCh = make(chan struct{}) - l.head = nil - l.tail = nil - l.len = 0 - l.mtx.Unlock() - return l -} - -// Return CList with MaxLength. CList will panic if it goes beyond MaxLength. -func New() *CList { return newWithMax(MaxLength) } - -// Return CList with given maxLength. -// Will panic if list exceeds given maxLength. -func newWithMax(maxLength int) *CList { - l := new(CList) - l.maxLen = maxLength - return l.Init() -} - -func (l *CList) Len() int { - l.mtx.RLock() - len := l.len - l.mtx.RUnlock() - return len -} - -func (l *CList) Front() *CElement { - l.mtx.RLock() - head := l.head - l.mtx.RUnlock() - return head -} - -func (l *CList) FrontWait() *CElement { - // Loop until the head is non-nil else wait and try again - for { - l.mtx.RLock() - head := l.head - wg := l.wg - l.mtx.RUnlock() - - if head != nil { - return head - } - wg.Wait() - // NOTE: If you think l.head exists here, think harder. - } -} - -func (l *CList) Back() *CElement { - l.mtx.RLock() - back := l.tail - l.mtx.RUnlock() - return back -} - -func (l *CList) BackWait() *CElement { - for { - l.mtx.RLock() - tail := l.tail - wg := l.wg - l.mtx.RUnlock() - - if tail != nil { - return tail - } - wg.Wait() - // l.tail doesn't necessarily exist here. - // That's why we need to continue a for-loop. - } -} - -// WaitChan can be used to wait until Front or Back becomes not nil. Once it -// does, channel will be closed. -func (l *CList) WaitChan() <-chan struct{} { - l.mtx.Lock() - defer l.mtx.Unlock() - - return l.waitCh -} - -// Panics if list grows beyond its max length. -func (l *CList) PushBack(v interface{}) *CElement { - l.mtx.Lock() - - // Construct a new element - e := &CElement{ - prev: nil, - prevWg: waitGroup1(), - prevWaitCh: make(chan struct{}), - next: nil, - nextWg: waitGroup1(), - nextWaitCh: make(chan struct{}), - removed: false, - Value: v, - } - - // Release waiters on FrontWait/BackWait maybe - if l.len == 0 { - l.wg.Done() - close(l.waitCh) - } - if l.len >= l.maxLen { - panic(fmt.Sprintf("clist: maximum length list reached %d", l.maxLen)) - } - l.len++ - - // Modify the tail - if l.tail == nil { - l.head = e - l.tail = e - } else { - e.SetPrev(l.tail) // We must init e first. - l.tail.SetNext(e) // This will make e accessible. - l.tail = e // Update the list. - } - l.mtx.Unlock() - return e -} - -// CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks. -// NOTE: As per the contract of CList, removed elements cannot be added back. -func (l *CList) Remove(e *CElement) interface{} { - l.mtx.Lock() - - prev := e.Prev() - next := e.Next() - - if l.head == nil || l.tail == nil { - l.mtx.Unlock() - panic("Remove(e) on empty CList") - } - if prev == nil && l.head != e { - l.mtx.Unlock() - panic("Remove(e) with false head") - } - if next == nil && l.tail != e { - l.mtx.Unlock() - panic("Remove(e) with false tail") - } - - // If we're removing the only item, make CList FrontWait/BackWait wait. - if l.len == 1 { - l.wg = waitGroup1() // WaitGroups are difficult to re-use. - l.waitCh = make(chan struct{}) - } - - // Update l.len - l.len-- - - // Connect next/prev and set head/tail - if prev == nil { - l.head = next - } else { - prev.SetNext(next) - } - if next == nil { - l.tail = prev - } else { - next.SetPrev(prev) - } - - // Set .Done() on e, otherwise waiters will wait forever. - e.SetRemoved() - - l.mtx.Unlock() - return e.Value -} - -func waitGroup1() (wg *sync.WaitGroup) { - wg = &sync.WaitGroup{} - wg.Add(1) - return -} diff --git a/mempool/clist/clist_test.go b/mempool/clist/clist_test.go deleted file mode 100644 index 02ec4caf63..0000000000 --- a/mempool/clist/clist_test.go +++ /dev/null @@ -1,310 +0,0 @@ -package clist - -import ( - "fmt" - "runtime" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - cmrand "github.com/cometbft/cometbft/libs/rand" -) - -func TestPanicOnMaxLength(t *testing.T) { - maxLength := 1000 - - l := newWithMax(maxLength) - for i := 0; i < maxLength; i++ { - l.PushBack(1) - } - assert.Panics(t, func() { - l.PushBack(1) - }) -} - -func TestSmall(t *testing.T) { - l := New() - el1 := l.PushBack(1) - el2 := l.PushBack(2) - el3 := l.PushBack(3) - if l.Len() != 3 { - t.Error("Expected len 3, got ", l.Len()) - } - - // fmt.Printf("%p %v\n", el1, el1) - // fmt.Printf("%p %v\n", el2, el2) - // fmt.Printf("%p %v\n", el3, el3) - - r1 := l.Remove(el1) - - // fmt.Printf("%p %v\n", el1, el1) - // fmt.Printf("%p %v\n", el2, el2) - // fmt.Printf("%p %v\n", el3, el3) - - r2 := l.Remove(el2) - - // fmt.Printf("%p %v\n", el1, el1) - // fmt.Printf("%p %v\n", el2, el2) - // fmt.Printf("%p %v\n", el3, el3) - - r3 := l.Remove(el3) - - if r1 != 1 { - t.Error("Expected 1, got ", r1) - } - if r2 != 2 { - t.Error("Expected 2, got ", r2) - } - if r3 != 3 { - t.Error("Expected 3, got ", r3) - } - if l.Len() != 0 { - t.Error("Expected len 0, got ", l.Len()) - } - -} - -// This test is quite hacky because it relies on SetFinalizer -// which isn't guaranteed to run at all. -func _TestGCFifo(t *testing.T) { - if runtime.GOARCH != "amd64" { - t.Skipf("Skipping on non-amd64 machine") - } - - const numElements = 1000000 - l := New() - gcCount := new(uint64) - - // SetFinalizer doesn't work well with circular structures, - // so we construct a trivial non-circular structure to - // track. - type value struct { - Int int - } - done := make(chan struct{}) - - for i := 0; i < numElements; i++ { - v := new(value) - v.Int = i - l.PushBack(v) - runtime.SetFinalizer(v, func(v *value) { - atomic.AddUint64(gcCount, 1) - }) - } - - for el := l.Front(); el != nil; { - l.Remove(el) - // oldEl := el - el = el.Next() - // oldEl.DetachPrev() - // oldEl.DetachNext() - } - - runtime.GC() - time.Sleep(time.Second * 3) - runtime.GC() - time.Sleep(time.Second * 3) - _ = done - - if *gcCount != numElements { - t.Errorf("expected gcCount to be %v, got %v", numElements, - *gcCount) - } -} - -// This test is quite hacky because it relies on SetFinalizer -// which isn't guaranteed to run at all. -func _TestGCRandom(t *testing.T) { - if runtime.GOARCH != "amd64" { - t.Skipf("Skipping on non-amd64 machine") - } - - const numElements = 1000000 - l := New() - gcCount := 0 - - // SetFinalizer doesn't work well with circular structures, - // so we construct a trivial non-circular structure to - // track. - type value struct { - Int int - } - - for i := 0; i < numElements; i++ { - v := new(value) - v.Int = i - l.PushBack(v) - runtime.SetFinalizer(v, func(v *value) { - gcCount++ - }) - } - - els := make([]*CElement, 0, numElements) - for el := l.Front(); el != nil; el = el.Next() { - els = append(els, el) - } - - for _, i := range cmrand.Perm(numElements) { - el := els[i] - l.Remove(el) - _ = el.Next() - } - - runtime.GC() - time.Sleep(time.Second * 3) - - if gcCount != numElements { - t.Errorf("expected gcCount to be %v, got %v", numElements, - gcCount) - } -} - -func TestScanRightDeleteRandom(t *testing.T) { - - const numElements = 1000 - const numTimes = 100 - const numScanners = 10 - - l := New() - stop := make(chan struct{}) - - els := make([]*CElement, numElements) - for i := 0; i < numElements; i++ { - el := l.PushBack(i) - els[i] = el - } - - // Launch scanner routines that will rapidly iterate over elements. - for i := 0; i < numScanners; i++ { - go func(scannerID int) { - var el *CElement - restartCounter := 0 - counter := 0 - FOR_LOOP: - for { - select { - case <-stop: - fmt.Println("stopped") - break FOR_LOOP - default: - } - if el == nil { - el = l.FrontWait() - restartCounter++ - } - el = el.Next() - counter++ - } - fmt.Printf("Scanner %v restartCounter: %v counter: %v\n", scannerID, restartCounter, counter) - }(i) - } - - // Remove an element, push back an element. - for i := 0; i < numTimes; i++ { - // Pick an element to remove - rmElIdx := cmrand.Intn(len(els)) - rmEl := els[rmElIdx] - - // Remove it - l.Remove(rmEl) - // fmt.Print(".") - - // Insert a new element - newEl := l.PushBack(-1*i - 1) - els[rmElIdx] = newEl - - if i%100000 == 0 { - fmt.Printf("Pushed %vK elements so far...\n", i/1000) - } - - } - - // Stop scanners - close(stop) - // time.Sleep(time.Second * 1) - - // And remove all the elements. - for el := l.Front(); el != nil; el = el.Next() { - l.Remove(el) - } - if l.Len() != 0 { - t.Fatal("Failed to remove all elements from CList") - } -} - -func TestWaitChan(t *testing.T) { - l := New() - ch := l.WaitChan() - - // 1) add one element to an empty list - go l.PushBack(1) - <-ch - - // 2) and remove it - el := l.Front() - v := l.Remove(el) - if v != 1 { - t.Fatal("where is 1 coming from?") - } - - // 3) test iterating forward and waiting for Next (NextWaitChan and Next) - el = l.PushBack(0) - - done := make(chan struct{}) - pushed := 0 - go func() { - for i := 1; i < 100; i++ { - l.PushBack(i) - pushed++ - time.Sleep(time.Duration(cmrand.Intn(25)) * time.Millisecond) - } - // apply a deterministic pause so the counter has time to catch up - time.Sleep(25 * time.Millisecond) - close(done) - }() - - next := el - seen := 0 -FOR_LOOP: - for { - select { - case <-next.NextWaitChan(): - next = next.Next() - seen++ - if next == nil { - t.Fatal("Next should not be nil when waiting on NextWaitChan") - } - case <-done: - break FOR_LOOP - case <-time.After(10 * time.Second): - t.Fatal("max execution time") - } - } - - if pushed != seen { - t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) - } - - // 4) test iterating backwards (PrevWaitChan and Prev) - prev := next - seen = 0 -FOR_LOOP2: - for { - select { - case <-prev.PrevWaitChan(): - prev = prev.Prev() - seen++ - if prev == nil { - t.Fatal("expected PrevWaitChan to block forever on nil when reached first elem") - } - case <-time.After(3 * time.Second): - break FOR_LOOP2 - } - } - - if pushed != seen { - t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) - } -} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go deleted file mode 100644 index 8d01a6905b..0000000000 --- a/mempool/clist_mempool.go +++ /dev/null @@ -1,687 +0,0 @@ -package mempool - -import ( - "bytes" - "context" - "errors" - "sync" - "sync/atomic" - - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/log" - "github.com/cometbft/cometbft/p2p" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" - - "github.com/rollkit/rollkit/mempool/clist" -) - -// CListMempool is an ordered in-memory pool for transactions before they are -// proposed in a consensus round. Transaction validity is checked using the -// CheckTx abci message before the transaction is added to the pool. The -// mempool uses a concurrent list structure for storing transactions that can -// be efficiently accessed by multiple concurrent readers. -type CListMempool struct { - // Atomic integers - height uint64 // the last block Update()'d to - txsBytes int64 // total size of mempool, in bytes - - // notify listeners (ie. consensus) when txs are available - notifiedTxsAvailable bool - txAvailMtx sync.Mutex - txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - - config *config.MempoolConfig - - // Exclusive mutex for Update method to prevent concurrent execution of - // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. - updateMtx sync.RWMutex - preCheck PreCheckFunc - postCheck PostCheckFunc - - txs *clist.CList // concurrent linked-list of good txs - proxyAppConn proxy.AppConnMempool - - // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated in - // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // Map for quick access to txs to record sender in CheckTx. - // txsMap: txKey -> CElement - txsMap sync.Map - - // Keep a cache of already-seen txs. - // This reduces the pressure on the proxyApp. - cache TxCache - - logger log.Logger - metrics *Metrics -} - -var _ Mempool = &CListMempool{} - -// CListMempoolOption sets an optional parameter on the mempool. -type CListMempoolOption func(*CListMempool) - -// NewCListMempool returns a new mempool with the given configuration and -// connection to an application. -func NewCListMempool( - cfg *config.MempoolConfig, - proxyAppConn proxy.AppConnMempool, - height uint64, - options ...CListMempoolOption, -) *CListMempool { - mp := &CListMempool{ - config: cfg, - proxyAppConn: proxyAppConn, - txs: clist.New(), - height: height, - recheckCursor: nil, - recheckEnd: nil, - logger: log.NewNopLogger(), - metrics: NopMetrics(), - } - - if cfg.CacheSize > 0 { - mp.cache = NewLRUTxCache(cfg.CacheSize) - } else { - mp.cache = NopTxCache{} - } - - proxyAppConn.SetResponseCallback(mp.globalCb) - - for _, option := range options { - option(mp) - } - - return mp -} - -// NOTE: not thread safe - should only be called once, on startup -func (mem *CListMempool) EnableTxsAvailable() { - mem.txsAvailable = make(chan struct{}, 1) -} - -// SetLogger sets the Logger. -func (mem *CListMempool) SetLogger(l log.Logger) { - mem.logger = l -} - -// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran before CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPreCheck(f PreCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.preCheck = f } -} - -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPostCheck(f PostCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.postCheck = f } -} - -// WithMetrics sets the metrics. -func WithMetrics(metrics *Metrics) CListMempoolOption { - return func(mem *CListMempool) { mem.metrics = metrics } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Lock() { - mem.updateMtx.Lock() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Unlock() { - mem.updateMtx.Unlock() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Size() int { - return mem.txs.Len() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) SizeBytes() int64 { - return atomic.LoadInt64(&mem.txsBytes) -} - -// Lock() must be help by the caller during execution. -func (mem *CListMempool) FlushAppConn() error { - return mem.proxyAppConn.Flush(context.TODO()) -} - -// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state. -func (mem *CListMempool) Flush() { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - _ = atomic.SwapInt64(&mem.txsBytes, 0) - mem.cache.Reset() - - for e := mem.txs.Front(); e != nil; e = e.Next() { - mem.txs.Remove(e) - e.DetachPrev() - } - - mem.txsMap.Range(func(key, _ interface{}) bool { - mem.txsMap.Delete(key) - return true - }) -} - -// TxsFront returns the first transaction in the ordered list for peer -// goroutines to call .NextWait() on. -// FIXME: leaking implementation details! -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsFront() *clist.CElement { - return mem.txs.Front() -} - -// TxsWaitChan returns a channel to wait on transactions. It will be closed -// once the mempool is not empty (ie. the internal `mem.txs` has at least one -// element) -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsWaitChan() <-chan struct{} { - return mem.txs.WaitChan() -} - -// It blocks if we're waiting on Update() or Reap(). -// cb: A callback from the CheckTx command. -// -// It gets called from another goroutine. -// -// CONTRACT: Either cb will get called, or err returned. -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTx( - tx types.Tx, - cb func(*abci.ResponseCheckTx), - txInfo TxInfo, -) error { - mem.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic - defer mem.updateMtx.RUnlock() - - txSize := len(tx) - - if err := mem.isFull(txSize); err != nil { - return err - } - - if txSize > mem.config.MaxTxBytes { - return ErrTxTooLarge{ - Max: mem.config.MaxTxBytes, - Actual: txSize, - } - } - - if mem.preCheck != nil { - if err := mem.preCheck(tx); err != nil { - return ErrPreCheck{ - Reason: err, - } - } - } - - // NOTE: proxyAppConn may error if tx buffer is full - if err := mem.proxyAppConn.Error(); err != nil { - return err - } - - if !mem.cache.Push(tx) { // if the transaction already exists in the cache - // Record a new sender for a tx we've already seen. - // Note it's possible a tx is still in the cache but no longer in the mempool - // (eg. after committing a block, txs are removed from mempool but not cache), - // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(tx.Key()); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - memTx.senders.LoadOrStore(txInfo.SenderID, true) - // TODO: consider punishing peer for dups, - // its non-trivial since invalid txs can become valid, - // but they can spam the same tx with little cost to them atm. - } - return ErrTxInCache - } - - reqRes, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{Tx: tx}) - if err != nil { - return err - } - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) - - return nil -} - -// Global callback that will be called after every ABCI response. -// Having a single global callback avoids needing to set a callback for each request. -// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), -// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that -// include this information. If we're not in the midst of a recheck, this function will just return, -// so the request specific callback can do the work. -// -// When rechecking, we don't need the peerID, so the recheck callback happens -// here. -func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { - if mem.recheckCursor == nil { - return - } - - mem.metrics.RecheckTimes.Add(1) - mem.resCbRecheck(req, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) -} - -// Request specific callback that should be set on individual reqRes objects -// to incorporate local information when processing the response. -// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. -// NOTE: alternatively, we could include this information in the ABCI request itself. -// -// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called -// when all other response processing is complete. -// -// Used in CheckTx to record PeerID who sent us the tx. -func (mem *CListMempool) reqResCb( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - externalCb func(*abci.ResponseCheckTx), -) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } - - mem.resCbFirstTime(tx, peerID, peerP2PID, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) - - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res.GetCheckTx()) - } - } -} - -// Called from: -// - resCbFirstTime (lock not held) if tx is valid -func (mem *CListMempool) addTx(memTx *mempoolTx) { - e := mem.txs.PushBack(memTx) - mem.txsMap.Store(memTx.tx.Key(), e) - atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) - mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) -} - -// Called from: -// - Update (lock held) if tx was committed -// - resCbRecheck (lock not held) if tx was invalidated -func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement) { - mem.txs.Remove(elem) - elem.DetachPrev() - mem.txsMap.Delete(tx.Key()) - atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) -} - -// RemoveTxByKey removes a transaction from the mempool by its TxKey index. -func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { - if e, ok := mem.txsMap.Load(txKey); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - if memTx != nil { - mem.removeTx(memTx.tx, e.(*clist.CElement)) - return nil - } - return errors.New("found empty transaction") - } - return errors.New("transaction not found") -} - -func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) - - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { - return ErrMempoolIsFull{ - NumTxs: memSize, - MaxTxs: mem.config.Size, - TxsBytes: txsBytes, - MaxTxsBytes: mem.config.MaxTxsBytes, - } - } - - return nil -} - -// callback, which is called after the app checked the tx for the first time. -// -// The case where the app checks the tx for the second and subsequent times is -// handled by the resCbRecheck callback. -func (mem *CListMempool) resCbFirstTime( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - res *abci.Response, -) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - - memTx := &mempoolTx{ - height: mem.height, - gasWanted: r.CheckTx.GasWanted, - tx: tx, - } - memTx.senders.Store(peerID, true) - mem.addTx(memTx) - mem.logger.Debug( - "added good transaction", - "tx", types.Tx(tx).Hash(), - "res", r, - "height", memTx.height, - "total", mem.Size(), - ) - mem.notifyTxsAvailable() - } else { - // ignore bad transaction - mem.logger.Debug( - "rejected bad transaction", - "tx", types.Tx(tx).Hash(), - "peerID", peerP2PID, - "res", r, - "err", postCheckErr, - ) - mem.metrics.FailedTxs.Add(1) - - if !mem.config.KeepInvalidTxsInCache { - // remove from cache (it might be good later) - mem.cache.Remove(tx) - } - } - - default: - // ignore other messages - } -} - -// callback, which is called after the app rechecked the tx. -// -// The case where the app checks the tx for the first time is handled by the -// resCbFirstTime callback. -func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - tx := req.GetCheckTx().Tx - memTx := mem.recheckCursor.Value.(*mempoolTx) - - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, memTx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - mem.logger.Error( - "re-CheckTx transaction mismatch", - "got", types.Tx(tx), - "expected", memTx.tx, - ) - - if mem.recheckCursor == mem.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - mem.recheckCursor = nil - return - } - - mem.recheckCursor = mem.recheckCursor.Next() - memTx = mem.recheckCursor.Value.(*mempoolTx) - } - - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - - if (r.CheckTx.Code != abci.CodeTypeOK) || postCheckErr != nil { - // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) - mem.removeTx(tx, mem.recheckCursor) - // We remove the invalid tx from the cache because it might be good later - if !mem.config.KeepInvalidTxsInCache { - mem.cache.Remove(tx) - } - } - if mem.recheckCursor == mem.recheckEnd { - mem.recheckCursor = nil - } else { - mem.recheckCursor = mem.recheckCursor.Next() - } - if mem.recheckCursor == nil { - // Done! - mem.logger.Debug("done rechecking txs") - - // in case the recheck removed all txs - if mem.Size() > 0 { - mem.notifyTxsAvailable() - } - } - default: - // ignore other messages - } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsAvailable() <-chan struct{} { - return mem.txsAvailable -} - -func (mem *CListMempool) notifyTxsAvailable() { - mem.txAvailMtx.Lock() - defer mem.txAvailMtx.Unlock() - if mem.Size() == 0 { - panic("notified txs available but mempool is empty!") - } - if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { - // channel cap is 1, so this will send once - mem.notifiedTxsAvailable = true - select { - case mem.txsAvailable <- struct{}{}: - default: - } - } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - var ( - totalGas int64 - runningSize int64 - ) - - // TODO: we will get a performance boost if we have a good estimate of avg - // size per tx, and set the initial capacity based off of that. - // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.txs.Len(), max/mem.avgTxSize)) - txs := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - // Check total gas requirement and total size requirement. - // If maxGas is negative, skip this check. - // Since newTotalGas < masGas, which - // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted - totalDataSize := runningSize + types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) - if (maxGas > -1 && newTotalGas > maxGas) || (maxBytes > -1 && totalDataSize > maxBytes) { - continue - } - totalGas = newTotalGas - runningSize = totalDataSize - txs = append(txs, memTx.tx) - } - return txs -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - if max < 0 { - max = mem.txs.Len() - } - - length := 0 - if mem.txs.Len() < max { - length = mem.txs.Len() - } else { - length = max - } - - txs := make([]types.Tx, 0, length) - for e := mem.txs.Front(); e != nil && len(txs) < max; e = e.Next() { - memTx := e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) - } - return txs -} - -// Lock() must be help by the caller during execution. -func (mem *CListMempool) Update( - height uint64, - txs types.Txs, - txResults []*abci.ExecTxResult, - preCheck PreCheckFunc, - postCheck PostCheckFunc, -) error { - // Set height - mem.height = height - mem.notifiedTxsAvailable = false - - if preCheck != nil { - mem.preCheck = preCheck - } - if postCheck != nil { - mem.postCheck = postCheck - } - - for i, tx := range txs { - if txResults[i].Code == abci.CodeTypeOK { - // Add valid committed tx to the cache (if missing). - _ = mem.cache.Push(tx) - } else if !mem.config.KeepInvalidTxsInCache { - // Allow invalid transactions to be resubmitted. - mem.cache.Remove(tx) - } - - // Remove committed tx from the mempool. - // - // Note an evil proposer can drop valid txs! - // Mempool before: - // 100 -> 101 -> 102 - // Block, proposed by an evil proposer: - // 101 -> 102 - // Mempool after: - // 100 - // https://github.com/tendermint/tendermint/issues/3322. - if err := mem.RemoveTxByKey(tx.Key()); err != nil { - mem.logger.Debug("Committed transaction not in local mempool (not an error)", - "key", tx.Key(), - "error", err.Error()) - } - } - - // Either recheck non-committed txs to see if they became invalid - // or just notify there're some txs left. - if mem.Size() > 0 { - if mem.config.Recheck { - mem.logger.Debug("recheck txs", "numtxs", mem.Size(), "height", height) - mem.recheckTxs() - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else { - mem.notifyTxsAvailable() - } - } - - // Update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) - - return nil -} - -func (mem *CListMempool) recheckTxs() { - if mem.Size() == 0 { - panic("recheckTxs is called, but the mempool is empty") - } - - mem.recheckCursor = mem.txs.Front() - mem.recheckEnd = mem.txs.Back() - - // Push txs to proxyAppConn - // NOTE: globalCb may be called concurrently. - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - _, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{ - Tx: memTx.tx, - Type: abci.CheckTxType_Recheck, - }) - if err != nil { - mem.logger.Error("recheckTx", err, "err") - return - } - } - - // In bool - senders sync.Map -} - -// Height returns the height for this transaction -func (memTx *mempoolTx) Height() uint64 { - return atomic.LoadUint64(&memTx.height) -} diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go deleted file mode 100644 index 60129c8b4c..0000000000 --- a/mempool/clist_mempool_test.go +++ /dev/null @@ -1,810 +0,0 @@ -package mempool - -import ( - "context" - "encoding/binary" - "fmt" - mrand "math/rand" - "os" - "path/filepath" - "testing" - "time" - - "github.com/cosmos/gogoproto/proto" - gogotypes "github.com/cosmos/gogoproto/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - abciclient "github.com/cometbft/cometbft/abci/client" - abciclimocks "github.com/cometbft/cometbft/abci/client/mocks" - "github.com/cometbft/cometbft/abci/example/kvstore" - abciserver "github.com/cometbft/cometbft/abci/server" - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/log" - cmtos "github.com/cometbft/cometbft/libs/os" - cmtrand "github.com/cometbft/cometbft/libs/rand" - "github.com/cometbft/cometbft/libs/service" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" -) - -const ( - DefaultTestChainID = "test-chain" -) - -func ResetTestRoot(testName string) *config.Config { - return ResetTestRootWithChainID(testName, "") -} - -func ResetTestRootWithChainID(testName string, chainID string) *config.Config { - // create a unique, concurrency-safe test directory under os.TempDir() - rootDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s_", chainID, testName)) - if err != nil { - panic(err) - } - - config.EnsureRoot(rootDir) - - baseConfig := config.DefaultBaseConfig() - genesisFilePath := filepath.Join(rootDir, baseConfig.Genesis) - privKeyFilePath := filepath.Join(rootDir, baseConfig.PrivValidatorKey) - privStateFilePath := filepath.Join(rootDir, baseConfig.PrivValidatorState) - - if !cmtos.FileExists(genesisFilePath) { - if chainID == "" { - chainID = DefaultTestChainID - } - testGenesis := fmt.Sprintf(testGenesisFmt, chainID) - cmtos.MustWriteFile(genesisFilePath, []byte(testGenesis), 0644) - } - // we always overwrite the priv val - cmtos.MustWriteFile(privKeyFilePath, []byte(testPrivValidatorKey), 0644) - cmtos.MustWriteFile(privStateFilePath, []byte(testPrivValidatorState), 0644) - - config := config.TestConfig().SetRoot(rootDir) - return config -} - -var testGenesisFmt = `{ - "genesis_time": "2018-10-10T08:20:13.695936996Z", - "chain_id": "%s", - "initial_height": "1", - "consensus_params": { - "block": { - "max_bytes": "22020096", - "max_gas": "-1", - "time_iota_ms": "10" - }, - "evidence": { - "max_age_num_blocks": "100000", - "max_age_duration": "172800000000000", - "max_bytes": "1048576" - }, - "validator": { - "pub_key_types": [ - "ed25519" - ] - }, - "abci": { - "vote_extensions_enable_height": "0" - }, - "version": {} - }, - "validators": [ - { - "pub_key": { - "type": "tendermint/PubKeyEd25519", - "value":"AT/+aaL1eB0477Mud9JMm8Sh8BIvOYlPGC9KkIUmFaE=" - }, - "power": "10", - "name": "" - } - ], - "app_hash": "" -}` - -var testPrivValidatorKey = `{ - "address": "A3258DCBF45DCA0DF052981870F2D1441A36D145", - "pub_key": { - "type": "tendermint/PubKeyEd25519", - "value": "AT/+aaL1eB0477Mud9JMm8Sh8BIvOYlPGC9KkIUmFaE=" - }, - "priv_key": { - "type": "tendermint/PrivKeyEd25519", - "value": "EVkqJO/jIXp3rkASXfh9YnyToYXRXhBr6g9cQVxPFnQBP/5povV4HTjvsy530kybxKHwEi85iU8YL0qQhSYVoQ==" - } -}` - -var testPrivValidatorState = `{ - "height": "0", - "round": 0, - "step": 0 -}` - -// A cleanupFunc cleans up any config / test files created for a particular -// test. -type cleanupFunc func() - -func newMempoolWithAppMock(client abciclient.Client) (*CListMempool, cleanupFunc, error) { - conf := ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfigMock(conf, client) - return mp, cu, nil -} - -func newMempoolWithAppAndConfigMock( - cfg *config.Config, - client abciclient.Client, -) (*CListMempool, cleanupFunc) { - appConnMem := client - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - err := appConnMem.Start() - if err != nil { - panic(err) - } - - mp := NewCListMempool(cfg.Mempool, appConnMem, 0) - mp.SetLogger(log.TestingLogger()) - - return mp, func() { os.RemoveAll(cfg.RootDir) } -} - -func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) { - conf := ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfig(cc, conf) - return mp, cu -} - -func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) { - appConnMem, _ := cc.NewABCIClient() - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - err := appConnMem.Start() - if err != nil { - panic(err) - } - - mp := NewCListMempool(cfg.Mempool, appConnMem, 0) - mp.SetLogger(log.TestingLogger()) - - return mp, func() { os.RemoveAll(cfg.RootDir) } -} - -func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - t.Fatal("Expected not to fire") - case <-timer.C: - } -} - -func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - case <-timer.C: - t.Fatal("Expected to fire") - } -} - -func checkTxs(t *testing.T, mp Mempool, count int, peerID uint16) types.Txs { - txs := make(types.Txs, count) - txInfo := TxInfo{SenderID: peerID} - for i := 0; i < count; i++ { - txBytes := kvstore.NewRandomTx(20) - txs[i] = txBytes - if err := mp.CheckTx(txBytes, nil, txInfo); err != nil { - // Skip invalid txs. - // TestMempoolFilters will fail otherwise. It asserts a number of txs - // returned. - if IsPreCheckError(err) { - continue - } - t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i) - } - } - return txs -} - -func TestReapMaxBytesMaxGas(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // Ensure gas calculation behaves as expected - checkTxs(t, mp, 1, UnknownPeerID) - tx0 := mp.TxsFront().Value.(*mempoolTx) - require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly") - // ensure each tx is 20 bytes long - require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes") - mp.Flush() - - // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes - tests := []struct { - numTxsToCreate int - maxBytes int64 - maxGas int64 - expectedNumTxs int - }{ - {20, -1, -1, 20}, - {20, -1, 0, 0}, - {20, -1, 10, 10}, - {20, -1, 30, 20}, - {20, 0, -1, 0}, - {20, 0, 10, 0}, - {20, 10, 10, 0}, - {20, 24, 10, 1}, - {20, 240, 5, 5}, - {20, 240, -1, 10}, - {20, 240, 10, 10}, - {20, 240, 15, 10}, - {20, 20000, -1, 20}, - {20, 20000, 5, 5}, - {20, 20000, 30, 20}, - } - for tcIndex, tt := range tests { - checkTxs(t, mp, tt.numTxsToCreate, UnknownPeerID) - got := mp.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas) - assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d", - len(got), tt.expectedNumTxs, tcIndex) - mp.Flush() - } -} - -func TestTxMempoolTxLargerThanMaxBytes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - txmp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // large high priority tx - bigTx := kvstore.NewRandomTx(100) - smallTx := kvstore.NewRandomTx(20) - require.NoError(t, txmp.CheckTx(bigTx, nil, TxInfo{SenderID: 1})) - require.NoError(t, txmp.CheckTx(smallTx, nil, TxInfo{SenderID: 2})) - - // reap by max bytes less than the large tx - reapedTxs := txmp.ReapMaxBytesMaxGas(100, -1) - require.Len(t, reapedTxs, 1) - require.Equal(t, types.Tx(smallTx), reapedTxs[0]) -} - -func TestMempoolFilters(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - emptyTxArr := []types.Tx{[]byte{}} - - nopPreFilter := func(tx types.Tx) error { return nil } - nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } - - // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes - tests := []struct { - numTxsToCreate int - preFilter PreCheckFunc - postFilter PostCheckFunc - expectedNumTxs int - }{ - {10, nopPreFilter, nopPostFilter, 10}, - {10, PreCheckMaxBytes(10), nopPostFilter, 0}, - {10, PreCheckMaxBytes(22), nopPostFilter, 10}, - {10, nopPreFilter, PostCheckMaxGas(-1), 10}, - {10, nopPreFilter, PostCheckMaxGas(0), 0}, - {10, nopPreFilter, PostCheckMaxGas(1), 10}, - {10, nopPreFilter, PostCheckMaxGas(3000), 10}, - {10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0}, - {10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0}, - } - for tcIndex, tt := range tests { - err := mp.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) - require.NoError(t, err) - checkTxs(t, mp, tt.numTxsToCreate, UnknownPeerID) - require.Equal(t, tt.expectedNumTxs, mp.Size(), "mempool had the incorrect size, on test case %d", tcIndex) - mp.Flush() - } -} - -func TestMempoolUpdate(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // 1. Adds valid txs to the cache - { - tx1 := kvstore.NewTxFromID(1) - err := mp.Update(1, []types.Tx{tx1}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - err = mp.CheckTx(tx1, nil, TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, ErrTxInCache, err) - } - } - - // 2. Removes valid txs from the mempool - { - tx2 := kvstore.NewTxFromID(2) - err := mp.CheckTx(tx2, nil, TxInfo{}) - require.NoError(t, err) - err = mp.Update(1, []types.Tx{tx2}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.Zero(t, mp.Size()) - } - - // 3. Removes invalid transactions from the cache and the mempool (if present) - { - tx3 := kvstore.NewTxFromID(3) - err := mp.CheckTx(tx3, nil, TxInfo{}) - require.NoError(t, err) - err = mp.Update(1, []types.Tx{tx3}, abciResponses(1, 1), nil, nil) - require.NoError(t, err) - assert.Zero(t, mp.Size()) - - err = mp.CheckTx(tx3, nil, TxInfo{}) - require.NoError(t, err) - } -} - -func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { - var callback abciclient.Callback - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - - mockClient.On("Error").Return(nil).Times(4) - mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true })) - - mp, cleanup, err := newMempoolWithAppMock(mockClient) - require.NoError(t, err) - defer cleanup() - - // Add 4 transactions to the mempool by calling the mempool's `CheckTx` on each of them. - txs := []types.Tx{[]byte{0x01}, []byte{0x02}, []byte{0x03}, []byte{0x04}} - for _, tx := range txs { - reqRes := abciclient.NewReqRes(abci.ToRequestCheckTx(&abci.RequestCheckTx{Tx: tx})) - reqRes.Response = abci.ToResponseCheckTx(&abci.ResponseCheckTx{Code: abci.CodeTypeOK}) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil) - err := mp.CheckTx(tx, nil, TxInfo{}) - require.NoError(t, err) - - // ensure that the callback that the mempool sets on the ReqRes is run. - reqRes.InvokeCallback() - } - - // Calling update to remove the first transaction from the mempool. - // This call also triggers the mempool to recheck its remaining transactions. - err = mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.Nil(t, err) - - // The mempool has now sent its requests off to the client to be rechecked - // and is waiting for the corresponding callbacks to be called. - // We now call the mempool-supplied callback on the first and third transaction. - // This simulates the client dropping the second request. - // Previous versions of this code panicked when the ABCI application missed - // a recheck-tx request. - resp := &abci.ResponseCheckTx{Code: abci.CodeTypeOK} - req := &abci.RequestCheckTx{Tx: txs[1]} - callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) - - req = &abci.RequestCheckTx{Tx: txs[3]} - callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) - mockClient.AssertExpectations(t) -} - -func TestMempool_KeepInvalidTxsInCache(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - wcfg := config.DefaultConfig() - wcfg.Mempool.KeepInvalidTxsInCache = true - mp, cleanup := newMempoolWithAppAndConfig(cc, wcfg) - defer cleanup() - - // 1. An invalid transaction must remain in the cache after Update - { - a := make([]byte, 8) - binary.BigEndian.PutUint64(a, 0) - - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, 1) - - err := mp.CheckTx(b, nil, TxInfo{}) - require.NoError(t, err) - - // simulate new block - _, err = app.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{ - Txs: [][]byte{a, b}, - }) - require.NoError(t, err) - err = mp.Update(1, []types.Tx{a, b}, - []*abci.ExecTxResult{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil) - require.NoError(t, err) - - // a must be added to the cache - err = mp.CheckTx(a, nil, TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, ErrTxInCache, err) - } - - // b must remain in the cache - err = mp.CheckTx(b, nil, TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, ErrTxInCache, err) - } - } - - // 2. An invalid transaction must remain in the cache - { - a := make([]byte, 8) - binary.BigEndian.PutUint64(a, 0) - - // remove a from the cache to test (2) - mp.cache.Remove(a) - - err := mp.CheckTx(a, nil, TxInfo{}) - require.NoError(t, err) - } -} - -func TestTxsAvailable(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - mp.EnableTxsAvailable() - - timeoutMS := 500 - - // with no txs, it shouldn't fire - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch of txs, it should only fire once - txs := checkTxs(t, mp, 100, UnknownPeerID) - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // call update with half the txs. - // it should fire once now for the new height - // since there are still txs left - committedTxs, remainingTxs := txs[:50], txs[50:] - if err := mp.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch more txs. we already fired for this height so it shouldn't fire again - moreTxs := checkTxs(t, mp, 50, UnknownPeerID) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // now call update with all the txs. it should not fire as there are no txs left - committedTxs = append(remainingTxs, moreTxs...) - if err := mp.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch more txs, it should only fire once - checkTxs(t, mp, 100, UnknownPeerID) - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) -} - -func TestSerialReap(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err := appConnCon.Start() - require.Nil(t, err) - - cacheMap := make(map[string]struct{}) - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := kvstore.NewTx(fmt.Sprintf("%d", i), "true") - err := mp.CheckTx(txBytes, nil, TxInfo{}) - _, cached := cacheMap[string(txBytes)] - if cached { - require.NotNil(t, err, "expected error for cached tx") - } else { - require.Nil(t, err, "expected no err for uncached tx") - } - cacheMap[string(txBytes)] = struct{}{} - - // Duplicates are cached and should return error - err = mp.CheckTx(txBytes, nil, TxInfo{}) - require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") - } - } - - reapCheck := func(exp int) { - txs := mp.ReapMaxBytesMaxGas(-1, -1) - require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) - } - - updateRange := func(start, end int) { - txs := make(types.Txs, end-start) - for i := start; i < end; i++ { - txs[i-start] = kvstore.NewTx(fmt.Sprintf("%d", i), "true") - } - if err := mp.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - } - - commitRange := func(start, end int) { - // Deliver some txs in a block - txs := make([][]byte, end-start) - for i := start; i < end; i++ { - txs[i-start] = kvstore.NewTx(fmt.Sprintf("%d", i), "true") - } - - res, err := appConnCon.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: txs}) - if err != nil { - t.Errorf("client error committing tx: %v", err) - } - for _, txResult := range res.TxResults { - if txResult.IsErr() { - t.Errorf("error committing tx. Code:%v result:%X log:%v", - txResult.Code, txResult.Data, txResult.Log) - } - } - if len(res.AppHash) != 8 { - t.Errorf("error committing. Hash:%X", res.AppHash) - } - - _, err = appConnCon.Commit(context.Background(), &abci.RequestCommit{}) - if err != nil { - t.Errorf("client error committing: %v", err) - } - } - - //---------------------------------------- - - // Deliver some txs. - deliverTxsRange(0, 100) - - // Reap the txs. - reapCheck(100) - - // Reap again. We should get the same amount - reapCheck(100) - - // Deliver 0 to 999, we should reap 900 new txs - // because 100 were already counted. - deliverTxsRange(0, 1000) - - // Reap the txs. - reapCheck(1000) - - // Reap again. We should get the same amount - reapCheck(1000) - - // Commit from the consensus AppConn - commitRange(0, 500) - updateRange(0, 500) - - // We should have 500 left. - reapCheck(500) - - // Deliver 100 invalid txs and 100 valid txs - deliverTxsRange(900, 1100) - - // We should have 600 now. - reapCheck(600) -} - -func TestMempool_CheckTxChecksTxSize(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - mempl, cleanup := newMempoolWithApp(cc) - defer cleanup() - - maxTxSize := mempl.config.MaxTxBytes - - testCases := []struct { - len int - err bool - }{ - // check small txs. no error - 0: {10, false}, - 1: {1000, false}, - 2: {1000000, false}, - - // check around maxTxSize - 3: {maxTxSize - 1, false}, - 4: {maxTxSize, false}, - 5: {maxTxSize + 1, true}, - } - - for i, testCase := range testCases { - caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) - - tx := cmtrand.Bytes(testCase.len) - - err := mempl.CheckTx(tx, nil, TxInfo{}) - bv := gogotypes.BytesValue{Value: tx} - bz, err2 := bv.Marshal() - require.NoError(t, err2) - require.Equal(t, len(bz), proto.Size(&bv), caseString) - - if !testCase.err { - require.NoError(t, err, caseString) - } else { - require.Equal(t, err, ErrTxTooLarge{ - Max: maxTxSize, - Actual: testCase.len, - }, caseString) - } - } -} - -func TestMempoolTxsBytes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := ResetTestRoot("mempool_test") - - cfg.Mempool.MaxTxsBytes = 100 - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - // 1. zero by default - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 2. len(tx) after CheckTx - tx1 := kvstore.NewRandomTx(10) - err := mp.CheckTx(tx1, nil, TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 10, mp.SizeBytes()) - - // 3. zero again after tx is removed by Update - err = mp.Update(1, []types.Tx{tx1}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 4. zero after Flush - tx2 := kvstore.NewRandomTx(20) - err = mp.CheckTx(tx2, nil, TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 20, mp.SizeBytes()) - - mp.Flush() - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. - tx3 := kvstore.NewRandomTx(100) - err = mp.CheckTx(tx3, nil, TxInfo{}) - require.NoError(t, err) - - tx4 := kvstore.NewRandomTx(10) - err = mp.CheckTx(tx4, nil, TxInfo{}) - if assert.Error(t, err) { - assert.IsType(t, ErrMempoolIsFull{}, err) - } - - // 6. zero after tx is rechecked and removed due to not being valid anymore - app2 := kvstore.NewInMemoryApplication() - cc = proxy.NewLocalClientCreator(app2) - - mp, cleanup = newMempoolWithApp(cc) - defer cleanup() - - txBytes := kvstore.NewRandomTx(10) - - err = mp.CheckTx(txBytes, nil, TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 10, mp.SizeBytes()) - - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err = appConnCon.Start() - require.Nil(t, err) - t.Cleanup(func() { - if err := appConnCon.Stop(); err != nil { - t.Error(err) - } - }) - - res, err := appConnCon.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: [][]byte{txBytes}}) - require.NoError(t, err) - require.EqualValues(t, 0, res.TxResults[0].Code) - require.NotEmpty(t, res.AppHash) - - _, err = appConnCon.Commit(context.Background(), &abci.RequestCommit{}) - require.NoError(t, err) - - // Pretend like we committed nothing so txBytes gets rechecked and removed. - err = mp.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.EqualValues(t, 10, mp.SizeBytes()) - - // 7. Test RemoveTxByKey function - err = mp.CheckTx(tx1, nil, TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 20, mp.SizeBytes()) - assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key())) - assert.EqualValues(t, 20, mp.SizeBytes()) - assert.NoError(t, mp.RemoveTxByKey(types.Tx(tx1).Key())) - assert.EqualValues(t, 10, mp.SizeBytes()) -} - -// This will non-deterministically catch some concurrency failures like -// https://github.com/tendermint/tendermint/issues/3509 -// TODO: all of the tests should probably also run using the remote proxy app -// since otherwise we're not actually testing the concurrency of the mempool here! -func TestMempoolRemoteAppConcurrency(t *testing.T) { - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmtrand.Str(6)) - app := kvstore.NewInMemoryApplication() - _, server := newRemoteApp(t, sockPath, app) - t.Cleanup(func() { - if err := server.Stop(); err != nil { - t.Error(err) - } - }) - - cfg := ResetTestRoot("mempool_test") - - mp, cleanup := newMempoolWithAppAndConfig(proxy.NewRemoteClientCreator(sockPath, "socket", true), cfg) - defer cleanup() - - // generate small number of txs - nTxs := 10 - txLen := 200 - txs := make([]types.Tx, nTxs) - for i := 0; i < nTxs; i++ { - txs[i] = kvstore.NewRandomTx(txLen) - } - - // simulate a group of peers sending them over and over - N := cfg.Mempool.Size - maxPeers := 5 - for i := 0; i < N; i++ { - peerID := mrand.Intn(maxPeers) - txNum := mrand.Intn(nTxs) - tx := txs[txNum] - - // this will err with ErrTxInCache many times ... - mp.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error - } - - require.NoError(t, mp.FlushAppConn()) -} - -// caller must close server -func newRemoteApp(t *testing.T, addr string, app abci.Application) (abciclient.Client, service.Service) { - clientCreator, err := abciclient.NewClient(addr, "socket", true) - require.NoError(t, err) - - // Start server - server := abciserver.NewSocketServer(addr, app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) - } - - return clientCreator, server -} - -func abciResponses(n int, code uint32) []*abci.ExecTxResult { - responses := make([]*abci.ExecTxResult, 0, n) - for i := 0; i < n; i++ { - responses = append(responses, &abci.ExecTxResult{Code: code}) - } - return responses -} diff --git a/mempool/mempool.go b/mempool/mempool.go deleted file mode 100644 index fbac8ff470..0000000000 --- a/mempool/mempool.go +++ /dev/null @@ -1,195 +0,0 @@ -package mempool - -import ( - "crypto/sha256" - "errors" - "fmt" - "math" - - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/types" -) - -const ( - MempoolChannel = byte(0x30) - - // PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind - PeerCatchupSleepIntervalMS = 100 - - // UnknownPeerID is the peer ID to use when running CheckTx when there is - // no peer (e.g. RPC) - UnknownPeerID uint16 = 0 - - MaxActiveIDs = math.MaxUint16 -) - -//go:generate ../scripts/mockery_generate.sh Mempool - -// Mempool defines the mempool interface. -// -// Updates to the mempool need to be synchronized with committing a block so -// applications can reset their transient state on Commit. -type Mempool interface { - // CheckTx executes a new transaction against the application to determine - // its validity and whether it should be added to the mempool. - CheckTx(tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error - - // RemoveTxByKey removes a transaction, identified by its key, - // from the mempool. - RemoveTxByKey(txKey types.TxKey) error - - // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes - // bytes total with the condition that the total gasWanted must be less than - // maxGas. - // - // If both maxes are negative, there is no cap on the size of all returned - // transactions (~ all available transactions). - ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs - - // ReapMaxTxs reaps up to max transactions from the mempool. If max is - // negative, there is no cap on the size of all returned transactions - // (~ all available transactions). - ReapMaxTxs(max int) types.Txs - - // Lock locks the mempool. The consensus must be able to hold lock to safely - // update. - Lock() - - // Unlock unlocks the mempool. - Unlock() - - // Update informs the mempool that the given txs were committed and can be - // discarded. - // - // NOTE: - // 1. This should be called *after* block is committed by consensus. - // 2. Lock/Unlock must be managed by the caller. - Update( - blockHeight uint64, - blockTxs types.Txs, - deliverTxResponses []*abci.ExecTxResult, - newPreFn PreCheckFunc, - newPostFn PostCheckFunc, - ) error - - // FlushAppConn flushes the mempool connection to ensure async callback calls - // are done, e.g. from CheckTx. - // - // NOTE: - // 1. Lock/Unlock must be managed by caller. - FlushAppConn() error - - // Flush removes all transactions from the mempool and caches. - Flush() - - // TxsAvailable returns a channel which fires once for every height, and only - // when transactions are available in the mempool. - // - // NOTE: - // 1. The returned channel may be nil if EnableTxsAvailable was not called. - TxsAvailable() <-chan struct{} - - // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will - // trigger once every height when transactions are available. - EnableTxsAvailable() - - // Size returns the number of transactions in the mempool. - Size() int - - // SizeBytes returns the total size of all txs in the mempool. - SizeBytes() int64 -} - -// PreCheckFunc is an optional filter executed before CheckTx and rejects -// transaction if false is returned. An example would be to ensure that a -// transaction doesn't exceeded the block size. -type PreCheckFunc func(types.Tx) error - -// PostCheckFunc is an optional filter executed after CheckTx and rejects -// transaction if false is returned. An example would be to ensure a -// transaction doesn't require more gas than available for the block. -type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error - -// PreCheckMaxBytes checks that the size of the transaction is smaller or equal -// to the expected maxBytes. -func PreCheckMaxBytes(maxBytes int64) PreCheckFunc { - return func(tx types.Tx) error { - txSize := types.ComputeProtoSizeForTxs([]types.Tx{tx}) - - if txSize > maxBytes { - return fmt.Errorf("tx size is too big: %d, max: %d", txSize, maxBytes) - } - - return nil - } -} - -// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed -// maxGas. Returns nil if maxGas is -1. -func PostCheckMaxGas(maxGas int64) PostCheckFunc { - return func(tx types.Tx, res *abci.ResponseCheckTx) error { - if maxGas == -1 { - return nil - } - if res.GasWanted < 0 { - return fmt.Errorf("gas wanted %d is negative", - res.GasWanted) - } - if res.GasWanted > maxGas { - return fmt.Errorf("gas wanted %d is greater than max gas %d", - res.GasWanted, maxGas) - } - - return nil - } -} - -// ErrTxInCache is returned to the client if we saw tx earlier -var ErrTxInCache = errors.New("tx already exists in cache") - -// TxKey is the fixed length array key used as an index. -type TxKey [sha256.Size]byte - -// ErrTxTooLarge defines an error when a transaction is too big to be sent in a -// message to other peers. -type ErrTxTooLarge struct { - Max int - Actual int -} - -func (e ErrTxTooLarge) Error() string { - return fmt.Sprintf("Tx too large. Max size is %d, but got %d", e.Max, e.Actual) -} - -// ErrMempoolIsFull defines an error where CometBFT and the application cannot -// handle that much load. -type ErrMempoolIsFull struct { - NumTxs int - MaxTxs int - TxsBytes int64 - MaxTxsBytes int64 -} - -func (e ErrMempoolIsFull) Error() string { - return fmt.Sprintf( - "mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", - e.NumTxs, - e.MaxTxs, - e.TxsBytes, - e.MaxTxsBytes, - ) -} - -// ErrPreCheck defines an error where a transaction fails a pre-check. -type ErrPreCheck struct { - Reason error -} - -func (e ErrPreCheck) Error() string { - return e.Reason.Error() -} - -// IsPreCheckError returns true if err is due to pre check failure. -func IsPreCheckError(err error) bool { - return errors.As(err, &ErrPreCheck{}) -} diff --git a/mempool/mempool.md b/mempool/mempool.md deleted file mode 100644 index 08961903a9..0000000000 --- a/mempool/mempool.md +++ /dev/null @@ -1,33 +0,0 @@ -# Mempool - -## Abstract - -The mempool module stores transactions which have not yet been included in a block, and provides an interface to check the validity of incoming transactions. It's defined by an interface [here](https://github.com/rollkit/rollkit/blob/main/mempool/mempool.go#L32), with an implementation [here](https://github.com/rollkit/rollkit/blob/main/mempool/mempool.go). - -## Component Description - -Full nodes instantiate a mempool [here](https://github.com/rollkit/rollkit/blob/main/node/full.go#L96). A `p2p.GossipValidator` is constructed from the node's mempool [here](https://github.com/rollkit/rollkit/blob/main/node/full.go#L465), which is used by Rollkit's P2P code to deal with peers who gossip invalid transactions. The mempool is also passed into the [block manager constructor](https://github.com/rollkit/rollkit/blob/main/node/full.go#L158), which creates a [`BlockExecutor`](https://github.com/rollkit/rollkit/blob/main/block/manager.go#L218) from the mempool. - -The [`BlockExecutor`](https://github.com/rollkit/rollkit/blob/main/state/block-executor.md) calls `ReapMaxBytesMaxGas` in [`CreateBlock`](https://github.com/rollkit/rollkit/blob/main/state/executor.go#L95) to get transactions from the pool for the new block. When `commit` is called, the `BlockExecutor` calls [`Update(...)`](https://github.com/rollkit/rollkit/blob/main/state/executor.go#L318) on the mempool, removing the old transactions from the pool. - -## Communication - -Several RPC methods query the mempool module: [`BroadcastTxCommit`](https://github.com/rollkit/rollkit/blob/main/node/full_client.go#L92), [`BroadcastTxAsync`](https://github.com/rollkit/rollkit/blob/main/node/full_client.go#L186), [`BroadcastTxSync`](https://github.com/rollkit/rollkit/blob/main/node/full_client.go#L202) call the mempool's `CheckTx(...)` method. - -## Interface - -| Function Name | Input Arguments | Output Type | Intended Behavior | -|---------------------|---------------------------------------------|------------------|------------------------------------------------------------------| -| CheckTx | tx types.Tx, callback func(*abci.Response), txInfo TxInfo | error | Executes a new transaction against the application to determine its validity and whether it should be added to the mempool. | -| RemoveTxByKey | txKey types.TxKey | error | Removes a transaction, identified by its key, from the mempool. | -| ReapMaxBytesMaxGas | maxBytes, maxGas int64 | types.Txs | Reaps transactions from the mempool up to maxBytes bytes total with the condition that the total gasWanted must be less than maxGas. If both maxes are negative, there is no cap on the size of all returned transactions (~ all available transactions). | -| ReapMaxTxs | max int | types.Txs | Reaps up to max transactions from the mempool. If max is negative, there is no cap on the size of all returned transactions (~ all available transactions). | -| Lock | N/A | N/A | Locks the mempool. The consensus must be able to hold the lock to safely update. | -| Unlock | N/A | N/A | Unlocks the mempool. | -| Update | blockHeight uint64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, newPostFn PostCheckFunc | error | Informs the mempool that the given txs were committed and can be discarded. This should be called *after* block is committed by consensus. Lock/Unlock must be managed by the caller. | -| FlushAppConn | N/A | error | Flushes the mempool connection to ensure async callback calls are done, e.g., from CheckTx. Lock/Unlock must be managed by the caller. | -| Flush | N/A | N/A | Removes all transactions from the mempool and caches. | -| TxsAvailable | N/A | <-chan struct{} | Returns a channel which fires once for every height when transactions are available in the mempool. The returned channel may be nil if EnableTxsAvailable was not called. | -| EnableTxsAvailable | N/A | N/A | Initializes the TxsAvailable channel, ensuring it will trigger once every height when transactions are available. | -| Size | N/A | int | Returns the number of transactions in the mempool. | -| SizeBytes | N/A | int64 | Returns the total size of all txs in the mempool. | diff --git a/mempool/metrics.go b/mempool/metrics.go deleted file mode 100644 index d118360ad7..0000000000 --- a/mempool/metrics.go +++ /dev/null @@ -1,119 +0,0 @@ -package mempool - -import ( - "github.com/go-kit/kit/metrics" - "github.com/go-kit/kit/metrics/discard" - "github.com/go-kit/kit/metrics/prometheus" - stdprometheus "github.com/prometheus/client_golang/prometheus" -) - -const ( - // MetricsSubsystem is a subsystem shared by all metrics exposed by this - // package. - MetricsSubsystem = "mempool" -) - -// Metrics contains metrics exposed by this package. -// see MetricsProvider for descriptions. -type Metrics struct { - // Size of the mempool. - Size metrics.Gauge - - // Total size of the mempool in bytes. - SizeBytes metrics.Gauge - - // Histogram of transaction sizes, in bytes. - TxSizeBytes metrics.Histogram - - // Number of failed transactions. - FailedTxs metrics.Counter - - // RejectedTxs defines the number of rejected transactions. These are - // transactions that passed CheckTx but failed to make it into the mempool - // due to resource limits, e.g. mempool is full and no lower priority - // transactions exist in the mempool. - RejectedTxs metrics.Counter - - // EvictedTxs defines the number of evicted transactions. These are valid - // transactions that passed CheckTx and existed in the mempool but were later - // evicted to make room for higher priority valid transactions that passed - // CheckTx. - EvictedTxs metrics.Counter - - // Number of times transactions are rechecked in the mempool. - RecheckTimes metrics.Counter -} - -// PrometheusMetrics returns Metrics build using Prometheus client library. -// Optionally, labels can be provided along with their values ("foo", -// "fooValue"). -func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { - labels := []string{} - for i := 0; i < len(labelsAndValues); i += 2 { - labels = append(labels, labelsAndValues[i]) - } - return &Metrics{ - Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "size", - Help: "Size of the mempool (number of uncommitted transactions).", - }, labels).With(labelsAndValues...), - - SizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "size_bytes", - Help: "Total size of the mempool in bytes.", - }, labels).With(labelsAndValues...), - - TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "tx_size_bytes", - Help: "Transaction sizes in bytes.", - Buckets: stdprometheus.ExponentialBuckets(1, 3, 17), - }, labels).With(labelsAndValues...), - - FailedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "failed_txs", - Help: "Number of failed transactions.", - }, labels).With(labelsAndValues...), - - RejectedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "rejected_txs", - Help: "Number of rejected transactions.", - }, labels).With(labelsAndValues...), - - EvictedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "evicted_txs", - Help: "Number of evicted transactions.", - }, labels).With(labelsAndValues...), - - RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "recheck_times", - Help: "Number of times transactions are rechecked in the mempool.", - }, labels).With(labelsAndValues...), - } -} - -// NopMetrics returns no-op Metrics. -func NopMetrics() *Metrics { - return &Metrics{ - Size: discard.NewGauge(), - SizeBytes: discard.NewGauge(), - TxSizeBytes: discard.NewHistogram(), - FailedTxs: discard.NewCounter(), - RejectedTxs: discard.NewCounter(), - EvictedTxs: discard.NewCounter(), - RecheckTimes: discard.NewCounter(), - } -} diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go deleted file mode 100644 index 7573c58e97..0000000000 --- a/mempool/mock/mempool.go +++ /dev/null @@ -1,184 +0,0 @@ -// Code generated by mockery. DO NOT EDIT. - -package mocks - -import ( - abcitypes "github.com/cometbft/cometbft/abci/types" - mempool "github.com/cometbft/cometbft/mempool" - - mock "github.com/stretchr/testify/mock" - - types "github.com/cometbft/cometbft/types" -) - -// Mempool is an autogenerated mock type for the Mempool type -type Mempool struct { - mock.Mock -} - -// CheckTx provides a mock function with given fields: tx, callback, txInfo -func (_m *Mempool) CheckTx(tx types.Tx, callback func(*abcitypes.ResponseCheckTx), txInfo mempool.TxInfo) error { - ret := _m.Called(tx, callback, txInfo) - - var r0 error - if rf, ok := ret.Get(0).(func(types.Tx, func(*abcitypes.ResponseCheckTx), mempool.TxInfo) error); ok { - r0 = rf(tx, callback, txInfo) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// EnableTxsAvailable provides a mock function with given fields: -func (_m *Mempool) EnableTxsAvailable() { - _m.Called() -} - -// Flush provides a mock function with given fields: -func (_m *Mempool) Flush() { - _m.Called() -} - -// FlushAppConn provides a mock function with given fields: -func (_m *Mempool) FlushAppConn() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Lock provides a mock function with given fields: -func (_m *Mempool) Lock() { - _m.Called() -} - -// ReapMaxBytesMaxGas provides a mock function with given fields: maxBytes, maxGas -func (_m *Mempool) ReapMaxBytesMaxGas(maxBytes int64, maxGas int64) types.Txs { - ret := _m.Called(maxBytes, maxGas) - - var r0 types.Txs - if rf, ok := ret.Get(0).(func(int64, int64) types.Txs); ok { - r0 = rf(maxBytes, maxGas) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(types.Txs) - } - } - - return r0 -} - -// ReapMaxTxs provides a mock function with given fields: max -func (_m *Mempool) ReapMaxTxs(max int) types.Txs { - ret := _m.Called(max) - - var r0 types.Txs - if rf, ok := ret.Get(0).(func(int) types.Txs); ok { - r0 = rf(max) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(types.Txs) - } - } - - return r0 -} - -// RemoveTxByKey provides a mock function with given fields: txKey -func (_m *Mempool) RemoveTxByKey(txKey types.TxKey) error { - ret := _m.Called(txKey) - - var r0 error - if rf, ok := ret.Get(0).(func(types.TxKey) error); ok { - r0 = rf(txKey) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Size provides a mock function with given fields: -func (_m *Mempool) Size() int { - ret := _m.Called() - - var r0 int - if rf, ok := ret.Get(0).(func() int); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int) - } - - return r0 -} - -// SizeBytes provides a mock function with given fields: -func (_m *Mempool) SizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - -// TxsAvailable provides a mock function with given fields: -func (_m *Mempool) TxsAvailable() <-chan struct{} { - ret := _m.Called() - - var r0 <-chan struct{} - if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan struct{}) - } - } - - return r0 -} - -// Unlock provides a mock function with given fields: -func (_m *Mempool) Unlock() { - _m.Called() -} - -// Update provides a mock function with given fields: blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn -func (_m *Mempool) Update(blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abcitypes.ExecTxResult, newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc) error { - ret := _m.Called(blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn) - - var r0 error - if rf, ok := ret.Get(0).(func(int64, types.Txs, []*abcitypes.ExecTxResult, mempool.PreCheckFunc, mempool.PostCheckFunc) error); ok { - r0 = rf(blockHeight, blockTxs, deliverTxResponses, newPreFn, newPostFn) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -type mockConstructorTestingTNewMempool interface { - mock.TestingT - Cleanup(func()) -} - -// NewMempool creates a new instance of Mempool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMempool(t mockConstructorTestingTNewMempool) *Mempool { - mock := &Mempool{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/mempool/reaper.go b/mempool/reaper.go deleted file mode 100644 index d059eb2bd5..0000000000 --- a/mempool/reaper.go +++ /dev/null @@ -1,113 +0,0 @@ -package mempool - -import ( - "context" - "sync" - "time" - - cmtypes "github.com/cometbft/cometbft/types" - - "github.com/cometbft/cometbft/libs/log" - "github.com/rollkit/go-sequencing" - "github.com/rollkit/go-sequencing/proxy/grpc" -) - -// ReapInterval is the interval at which the reaper checks the mempool for transactions to reap. -const ( - ReapInterval time.Duration = 1 * time.Second - MaxRetries int = 3 - RetryDelay time.Duration = 2 * time.Second -) - -// CListMempoolReaper is a reaper that reaps transactions from the mempool and sends them to the gRPC server. -type CListMempoolReaper struct { - mempool Mempool - stopCh chan struct{} - grpcClient *grpc.Client - rollupId []byte - submitted map[cmtypes.TxKey]struct{} - mu sync.RWMutex // Add a mutex to protect the submitted map - logger log.Logger -} - -// NewCListMempoolReaper initializes the mempool and sets up the gRPC client. -func NewCListMempoolReaper(mempool Mempool, rollupId []byte, seqClient *grpc.Client, logger log.Logger) *CListMempoolReaper { - return &CListMempoolReaper{ - mempool: mempool, - stopCh: make(chan struct{}), - grpcClient: seqClient, - rollupId: rollupId, - submitted: make(map[cmtypes.TxKey]struct{}), - logger: logger, - } -} - -// StartReaper starts the reaper goroutine. -func (r *CListMempoolReaper) StartReaper(ctx context.Context) error { - go func() { - ticker := time.NewTicker(ReapInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.reap(ctx) - case <-r.stopCh: - return - } - } - }() - return nil -} - -// UpdateCommitedTxs removes the committed transactions from the submitted map. -func (r *CListMempoolReaper) UpdateCommitedTxs(txs []cmtypes.Tx) { - r.mu.Lock() // Lock the mutex before modifying the map - defer r.mu.Unlock() - for _, tx := range txs { - delete(r.submitted, tx.Key()) - } -} - -// StopReaper stops the reaper goroutine. -func (r *CListMempoolReaper) StopReaper() { - close(r.stopCh) -} - -// reap removes all transactions from the mempool and sends them to the gRPC server. -func (r *CListMempoolReaper) reap(ctx context.Context) { - txs := r.mempool.ReapMaxTxs(-1) - for _, tx := range txs { - r.mu.RLock() // Read lock before checking the map - _, ok := r.submitted[tx.Key()] - r.mu.RUnlock() // Unlock after checking - - if ok { - continue - } - if err := r.retrySubmitTransaction(ctx, tx, MaxRetries, RetryDelay); err != nil { - r.logger.Error("Error submitting transaction", "tx key", tx.Key(), "error", err) - continue - } - r.logger.Info("Reaper submitted transaction successfully", "tx key", tx.Key()) - - r.mu.Lock() // Lock the mutex before writing to the map - r.submitted[tx.Key()] = struct{}{} - r.mu.Unlock() // Unlock after modifying the map - } -} - -func (reaper *CListMempoolReaper) retrySubmitTransaction(ctx context.Context, tx cmtypes.Tx, maxRetries int, delay time.Duration) error { - var err error - for i := 0; i < maxRetries; i++ { - // ignore the response for now as nothing is in there - _, err = reaper.grpcClient.SubmitRollupTransaction(ctx, sequencing.SubmitRollupTransactionRequest{RollupId: reaper.rollupId, Tx: tx}) - if err == nil { - return nil - } - time.Sleep(delay) - } - return err -} diff --git a/mempool/tx.go b/mempool/tx.go deleted file mode 100644 index bbc5060c38..0000000000 --- a/mempool/tx.go +++ /dev/null @@ -1,17 +0,0 @@ -package mempool - -import ( - "github.com/cometbft/cometbft/p2p" -) - -// TxInfo are parameters that get passed when attempting to add a tx to the -// mempool. -type TxInfo struct { - // SenderID is the internal peer ID used in the mempool to identify the - // sender, storing two bytes with each transaction instead of 20 bytes for - // the types.NodeID. - SenderID uint16 - - // SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging. - SenderP2PID p2p.ID -} diff --git a/node/full.go b/node/full.go index 9775749cf0..2cf6024df1 100644 --- a/node/full.go +++ b/node/full.go @@ -19,10 +19,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/libs/service" - corep2p "github.com/cometbft/cometbft/p2p" cmtypes "github.com/cometbft/cometbft/types" proxyda "github.com/rollkit/go-da/proxy" @@ -32,7 +30,6 @@ import ( "github.com/rollkit/rollkit/block" "github.com/rollkit/rollkit/config" "github.com/rollkit/rollkit/da" - "github.com/rollkit/rollkit/mempool" "github.com/rollkit/rollkit/p2p" "github.com/rollkit/rollkit/store" "github.com/rollkit/rollkit/types" @@ -68,9 +65,6 @@ type FullNode struct { p2pClient *p2p.Client hSyncService *block.HeaderSyncService dSyncService *block.DataSyncService - // TODO(tzdybal): consider extracting "mempool reactor" - Mempool mempool.Mempool - mempoolIDs *mempoolIDs Store store.Store blockManager *block.Manager @@ -164,7 +158,6 @@ func newFullNode( } node.BaseService = *service.NewBaseService(logger, "Node", node) - node.p2pClient.SetTxValidator(node.newTxValidator(p2pMetrics)) return node, nil } @@ -448,46 +441,6 @@ func (n *FullNode) EventBus() *cmtypes.EventBus { return n.eventBus } -// newTxValidator creates a pubsub validator that uses the node's mempool to check the -// transaction. If the transaction is valid, then it is added to the mempool -func (n *FullNode) newTxValidator(metrics *p2p.Metrics) p2p.GossipValidator { - return func(m *p2p.GossipMessage) bool { - n.Logger.Debug("transaction received", "bytes", len(m.Data)) - msgBytes := m.Data - labels := []string{ - "peer_id", m.From.String(), - "chID", n.genesis.ChainID, - } - metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - metrics.MessageReceiveBytesTotal.With("message_type", "tx").Add(float64(len(msgBytes))) - checkTxResCh := make(chan *abci.ResponseCheckTx, 1) - err := n.Mempool.CheckTx(m.Data, func(resp *abci.ResponseCheckTx) { - select { - case <-n.ctx.Done(): - return - case checkTxResCh <- resp: - } - }, mempool.TxInfo{ - SenderID: n.mempoolIDs.GetForPeer(m.From), - SenderP2PID: corep2p.ID(m.From), - }) - switch { - case errors.Is(err, mempool.ErrTxInCache): - return true - case errors.Is(err, mempool.ErrMempoolIsFull{}): - return true - case errors.Is(err, mempool.ErrTxTooLarge{}): - return false - case errors.Is(err, mempool.ErrPreCheck{}): - return false - default: - } - checkTxResp := <-checkTxResCh - - return checkTxResp.Code == abci.CodeTypeOK - } -} - func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) } diff --git a/node/light.go b/node/light.go index 81aeedcb74..a0074c2e8c 100644 --- a/node/light.go +++ b/node/light.go @@ -72,8 +72,6 @@ func newLightNode( ctx: ctx, } - node.P2P.SetTxValidator(node.falseValidator()) - node.BaseService = *service.NewBaseService(logger, "LightNode", node) return node, nil @@ -113,10 +111,3 @@ func (ln *LightNode) OnStop() { err = errors.Join(err, ln.hSyncService.Stop(ln.ctx)) ln.Logger.Error("errors while stopping node:", "errors", err) } - -// Dummy validator that always returns a callback function with boolean `false` -func (ln *LightNode) falseValidator() p2p.GossipValidator { - return func(*p2p.GossipMessage) bool { - return false - } -} diff --git a/node/mempool.go b/node/mempool.go deleted file mode 100644 index 21e7720bef..0000000000 --- a/node/mempool.go +++ /dev/null @@ -1,75 +0,0 @@ -package node - -import ( - "fmt" - "math" - "sync" - - "github.com/libp2p/go-libp2p/core/peer" -) - -const ( - maxActiveIDs = math.MaxUint16 -) - -type mempoolIDs struct { - mtx sync.RWMutex - peerMap map[peer.ID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} - -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer peer.ID) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - curID := ids.nextPeerID() - ids.peerMap[peer] = curID - ids.activeIDs[curID] = struct{}{} -} - -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == maxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) - } - - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID -} - -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer peer.ID) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - removedID, ok := ids.peerMap[peer] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer) - } -} - -// GetForPeer returns an ID for the peer. ID is generated if required. -func (ids *mempoolIDs) GetForPeer(peer peer.ID) uint16 { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - id, ok := ids.peerMap[peer] - if !ok { - id = ids.nextPeerID() - ids.peerMap[peer] = id - ids.activeIDs[id] = struct{}{} - } - - return id -} diff --git a/p2p/client.go b/p2p/client.go index d74f5d7b48..922329ce3e 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -38,9 +38,6 @@ const ( // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 - - // txTopicSuffix is added after namespace to create pubsub topic for TX gossiping. - txTopicSuffix = "-tx" ) // Client is a P2P client, implemented with libp2p. @@ -59,9 +56,6 @@ type Client struct { gater *conngater.BasicConnectionGater ps *pubsub.PubSub - txGossiper *Gossiper - txValidator GossipValidator - // cancel is used to cancel context passed to libp2p functions // it's required because of discovery.Advertise call cancel context.CancelFunc @@ -109,7 +103,7 @@ func (c *Client) Start(ctx context.Context) error { // create new, cancelable context ctx, c.cancel = context.WithCancel(ctx) c.logger.Debug("starting P2P client") - host, err := c.listen(ctx) + host, err := c.listen() if err != nil { return err } @@ -155,23 +149,11 @@ func (c *Client) Close() error { c.cancel() return errors.Join( - c.txGossiper.Close(), c.dht.Close(), c.host.Close(), ) } -// GossipTx sends the transaction to the P2P network. -func (c *Client) GossipTx(ctx context.Context, tx []byte) error { - c.logger.Debug("Gossiping TX", "len", len(tx)) - return c.txGossiper.Publish(ctx, tx) -} - -// SetTxValidator sets the callback function, that will be invoked during message gossiping. -func (c *Client) SetTxValidator(val GossipValidator) { - c.txValidator = val -} - // Addrs returns listen addresses of Client. func (c *Client) Addrs() []multiaddr.Multiaddr { return c.host.Addrs() @@ -243,7 +225,7 @@ func (c *Client) Peers() []PeerConnection { return res } -func (c *Client) listen(ctx context.Context) (host.Host, error) { +func (c *Client) listen() (host.Host, error) { maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { return nil, err @@ -358,13 +340,6 @@ func (c *Client) setupGossiping(ctx context.Context) error { if err != nil { return err } - - c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator)) - if err != nil { - return err - } - go c.txGossiper.ProcessMessages(ctx) - return nil } @@ -398,7 +373,3 @@ func (c *Client) parseAddrInfoList(addrInfoStr string) []peer.AddrInfo { func (c *Client) getNamespace() string { return c.chainID } - -func (c *Client) getTxTopic() string { - return c.getNamespace() + txTopicSuffix -} diff --git a/p2p/client_test.go b/p2p/client_test.go index ad7b5d1a68..8d573a8dfc 100644 --- a/p2p/client_test.go +++ b/p2p/client_test.go @@ -3,9 +3,7 @@ package p2p import ( "context" "crypto/rand" - "sync" "testing" - "time" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" @@ -71,7 +69,7 @@ func TestBootstrapping(t *testing.T) { 1: {conns: []int{0}}, 2: {conns: []int{0, 1}}, 3: {conns: []int{0}}, - }, make([]GossipValidator, 4), logger) + }, logger) // wait for clients to finish refreshing routing tables clients.WaitForDHT() @@ -92,7 +90,7 @@ func TestDiscovery(t *testing.T) { 2: {conns: []int{0}, chainID: "ORU2"}, 3: {conns: []int{1}, chainID: "ORU1"}, 4: {conns: []int{2}, chainID: "ORU1"}, - }, make([]GossipValidator, 5), logger) + }, logger) // wait for clients to finish refreshing routing tables clients.WaitForDHT() @@ -101,58 +99,6 @@ func TestDiscovery(t *testing.T) { assert.Contains(clients[4].host.Network().Peers(), clients[3].host.ID()) } -func TestGossiping(t *testing.T) { - assert := assert.New(t) - logger := test.NewFileLogger(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var expectedMsg = []byte("foobar") - var wg sync.WaitGroup - - wg.Add(3) - - // ensure that Tx is delivered to client - assertRecv := func(tx *GossipMessage) bool { - logger.Debug("received tx", "body", string(tx.Data), "from", tx.From) - assert.Equal(expectedMsg, tx.Data) - wg.Done() - return true - } - - // ensure that Tx is not delivered to client - assertNotRecv := func(*GossipMessage) bool { - t.Fatal("unexpected Tx received") - return false - } - - validators := []GossipValidator{assertRecv, assertNotRecv, assertNotRecv, assertRecv, assertRecv} - - // network connections topology: 3<->1<->0<->2<->4 - clients := startTestNetwork(ctx, t, 5, map[int]hostDescr{ - 0: {conns: []int{}, chainID: "2"}, - 1: {conns: []int{0}, chainID: "1", realKey: true}, - 2: {conns: []int{0}, chainID: "1", realKey: true}, - 3: {conns: []int{1}, chainID: "2", realKey: true}, - 4: {conns: []int{2}, chainID: "2", realKey: true}, - }, validators, logger) - - // wait for clients to finish refreshing routing tables - clients.WaitForDHT() - - // this sleep is required for pubsub to "propagate" subscription information - // TODO(tzdybal): is there a better way to wait for readiness? - time.Sleep(1 * time.Second) - - // gossip from client 4 - err := clients[4].GossipTx(ctx, expectedMsg) - assert.NoError(err) - - // wait for clients that should receive Tx - wg.Wait() -} - func TestSeedStringParsing(t *testing.T) { t.Parallel() diff --git a/p2p/gossip.go b/p2p/gossip.go deleted file mode 100644 index bee023d29d..0000000000 --- a/p2p/gossip.go +++ /dev/null @@ -1,114 +0,0 @@ -package p2p - -import ( - "context" - "errors" - - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/rollkit/rollkit/third_party/log" -) - -// GossipMessage represents message gossiped via P2P network (e.g. transaction, Block etc). -type GossipMessage struct { - Data []byte - From peer.ID -} - -// GossipValidator is a callback function type. -type GossipValidator func(*GossipMessage) bool - -// GossiperOption sets optional parameters of Gossiper. -type GossiperOption func(*Gossiper) error - -// WithValidator options registers topic validator for Gossiper. -func WithValidator(validator GossipValidator) GossiperOption { - return func(g *Gossiper) error { - return g.ps.RegisterTopicValidator(g.topic.String(), wrapValidator(validator)) - } -} - -// Gossiper is an abstraction of P2P publish subscribe mechanism. -type Gossiper struct { - ownID peer.ID - - ps *pubsub.PubSub - topic *pubsub.Topic - sub *pubsub.Subscription - - logger log.Logger -} - -// NewGossiper creates new, ready to use instance of Gossiper. -// -// Returned Gossiper object can be used for sending (Publishing) and receiving messages in topic identified by topicStr. -func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, logger log.Logger, options ...GossiperOption) (*Gossiper, error) { - topic, err := ps.Join(topicStr) - if err != nil { - return nil, err - } - - subscription, err := topic.Subscribe() - if err != nil { - return nil, err - } - g := &Gossiper{ - ownID: host.ID(), - ps: ps, - topic: topic, - sub: subscription, - logger: logger, - } - - for _, option := range options { - err := option(g) - if err != nil { - return nil, err - } - } - - return g, nil -} - -// Close is used to disconnect from topic and free resources used by Gossiper. -func (g *Gossiper) Close() error { - err := g.ps.UnregisterTopicValidator(g.topic.String()) - g.sub.Cancel() - return errors.Join( - err, - g.topic.Close(), - ) -} - -// Publish publishes data to gossip topic. -func (g *Gossiper) Publish(ctx context.Context, data []byte) error { - return g.topic.Publish(ctx, data) -} - -// ProcessMessages waits for messages published in the topic and execute handler. -func (g *Gossiper) ProcessMessages(ctx context.Context) { - for { - _, err := g.sub.Next(ctx) - select { - case <-ctx.Done(): - return - default: - if err != nil { - g.logger.Error("failed to read message", "error", err) - return - } - } - // Logic is handled in validator - } -} - -func wrapValidator(validator GossipValidator) pubsub.Validator { - return func(_ context.Context, _ peer.ID, msg *pubsub.Message) bool { - return validator(&GossipMessage{ - Data: msg.Data, - From: msg.GetFrom(), - }) - } -} diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 7d5fca431e..129bc8b7f3 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -64,7 +64,7 @@ func getAddr(sk crypto.PrivKey) (multiaddr.Multiaddr, error) { return a, nil } -func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, validators []GossipValidator, logger log.Logger) testNet { +func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, logger log.Logger) testNet { t.Helper() require := require.New(t) @@ -111,7 +111,6 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos require.NoError(err) require.NotNil(client) - client.SetTxValidator(validators[i]) clients[i] = client }