Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/service"
corep2p "github.com/cometbft/cometbft/p2p"
cmtypes "github.com/cometbft/cometbft/types"

proxyda "github.com/rollkit/go-da/proxy"
Expand Down Expand Up @@ -75,7 +73,6 @@ type FullNode struct {
dSyncService *block.DataSyncService
// TODO(tzdybal): consider extracting "mempool reactor"
Mempool mempool.Mempool
mempoolIDs *mempoolIDs
Store store.Store
blockManager *block.Manager

Expand Down Expand Up @@ -181,7 +178,6 @@ func newFullNode(
}

node.BaseService = *service.NewBaseService(logger, "Node", node)
node.p2pClient.SetTxValidator(node.newTxValidator(p2pMetrics))

return node, nil
}
Expand Down Expand Up @@ -466,46 +462,6 @@ func (n *FullNode) EventBus() *cmtypes.EventBus {
return n.eventBus
}

// newTxValidator creates a pubsub validator that uses the node's mempool to check the
// transaction. If the transaction is valid, then it is added to the mempool
func (n *FullNode) newTxValidator(metrics *p2p.Metrics) p2p.GossipValidator {
return func(m *p2p.GossipMessage) bool {
n.Logger.Debug("transaction received", "bytes", len(m.Data))
msgBytes := m.Data
labels := []string{
"peer_id", m.From.String(),
"chID", n.genesis.ChainID,
}
metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
metrics.MessageReceiveBytesTotal.With("message_type", "tx").Add(float64(len(msgBytes)))
checkTxResCh := make(chan *abci.ResponseCheckTx, 1)
err := n.Mempool.CheckTx(m.Data, func(resp *abci.ResponseCheckTx) {
select {
case <-n.ctx.Done():
return
case checkTxResCh <- resp:
}
}, mempool.TxInfo{
SenderID: n.mempoolIDs.GetForPeer(m.From),
SenderP2PID: corep2p.ID(m.From),
})
switch {
case errors.Is(err, mempool.ErrTxInCache):
return true
case errors.Is(err, mempool.ErrMempoolIsFull{}):
return true
case errors.Is(err, mempool.ErrTxTooLarge{}):
return false
case errors.Is(err, mempool.ErrPreCheck{}):
return false
default:
}
checkTxResp := <-checkTxResCh

return checkTxResp.Code == abci.CodeTypeOK
}
}

func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore {
return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore)
}
Expand Down
9 changes: 0 additions & 9 deletions node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func newLightNode(
ctx: ctx,
}

node.P2P.SetTxValidator(node.falseValidator())

node.BaseService = *service.NewBaseService(logger, "LightNode", node)

return node, nil
Expand Down Expand Up @@ -113,10 +111,3 @@ func (ln *LightNode) OnStop() {
err = errors.Join(err, ln.hSyncService.Stop(ln.ctx))
ln.Logger.Error("errors while stopping node:", "errors", err)
}

// Dummy validator that always returns a callback function with boolean `false`
func (ln *LightNode) falseValidator() p2p.GossipValidator {
return func(*p2p.GossipMessage) bool {
return false
}
}
33 changes: 2 additions & 31 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ const (

// peerLimit defines limit of number of peers returned during active peer discovery.
peerLimit = 60

// txTopicSuffix is added after namespace to create pubsub topic for TX gossiping.
txTopicSuffix = "-tx"
)

// Client is a P2P client, implemented with libp2p.
Expand All @@ -59,9 +56,6 @@ type Client struct {
gater *conngater.BasicConnectionGater
ps *pubsub.PubSub

txGossiper *Gossiper
txValidator GossipValidator

// cancel is used to cancel context passed to libp2p functions
// it's required because of discovery.Advertise call
cancel context.CancelFunc
Expand Down Expand Up @@ -109,7 +103,7 @@ func (c *Client) Start(ctx context.Context) error {
// create new, cancelable context
ctx, c.cancel = context.WithCancel(ctx)
c.logger.Debug("starting P2P client")
host, err := c.listen(ctx)
host, err := c.listen()
if err != nil {
return err
}
Expand Down Expand Up @@ -155,23 +149,11 @@ func (c *Client) Close() error {
c.cancel()

return errors.Join(
c.txGossiper.Close(),
c.dht.Close(),
c.host.Close(),
)
}

// GossipTx sends the transaction to the P2P network.
func (c *Client) GossipTx(ctx context.Context, tx []byte) error {
c.logger.Debug("Gossiping TX", "len", len(tx))
return c.txGossiper.Publish(ctx, tx)
}

// SetTxValidator sets the callback function, that will be invoked during message gossiping.
func (c *Client) SetTxValidator(val GossipValidator) {
c.txValidator = val
}

// Addrs returns listen addresses of Client.
func (c *Client) Addrs() []multiaddr.Multiaddr {
return c.host.Addrs()
Expand Down Expand Up @@ -243,7 +225,7 @@ func (c *Client) Peers() []PeerConnection {
return res
}

func (c *Client) listen(ctx context.Context) (host.Host, error) {
func (c *Client) listen() (host.Host, error) {
maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress)
if err != nil {
return nil, err
Expand Down Expand Up @@ -358,13 +340,6 @@ func (c *Client) setupGossiping(ctx context.Context) error {
if err != nil {
return err
}

c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator))
if err != nil {
return err
}
go c.txGossiper.ProcessMessages(ctx)

return nil
}

Expand Down Expand Up @@ -398,7 +373,3 @@ func (c *Client) parseAddrInfoList(addrInfoStr string) []peer.AddrInfo {
func (c *Client) getNamespace() string {
return c.chainID
}

func (c *Client) getTxTopic() string {
return c.getNamespace() + txTopicSuffix
}
58 changes: 2 additions & 56 deletions p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package p2p
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"

"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -71,7 +69,7 @@ func TestBootstrapping(t *testing.T) {
1: {conns: []int{0}},
2: {conns: []int{0, 1}},
3: {conns: []int{0}},
}, make([]GossipValidator, 4), logger)
}, logger)

// wait for clients to finish refreshing routing tables
clients.WaitForDHT()
Expand All @@ -92,7 +90,7 @@ func TestDiscovery(t *testing.T) {
2: {conns: []int{0}, chainID: "ORU2"},
3: {conns: []int{1}, chainID: "ORU1"},
4: {conns: []int{2}, chainID: "ORU1"},
}, make([]GossipValidator, 5), logger)
}, logger)

// wait for clients to finish refreshing routing tables
clients.WaitForDHT()
Expand All @@ -101,58 +99,6 @@ func TestDiscovery(t *testing.T) {
assert.Contains(clients[4].host.Network().Peers(), clients[3].host.ID())
}

func TestGossiping(t *testing.T) {
assert := assert.New(t)
logger := test.NewFileLogger(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var expectedMsg = []byte("foobar")
var wg sync.WaitGroup

wg.Add(3)

// ensure that Tx is delivered to client
assertRecv := func(tx *GossipMessage) bool {
logger.Debug("received tx", "body", string(tx.Data), "from", tx.From)
assert.Equal(expectedMsg, tx.Data)
wg.Done()
return true
}

// ensure that Tx is not delivered to client
assertNotRecv := func(*GossipMessage) bool {
t.Fatal("unexpected Tx received")
return false
}

validators := []GossipValidator{assertRecv, assertNotRecv, assertNotRecv, assertRecv, assertRecv}

// network connections topology: 3<->1<->0<->2<->4
clients := startTestNetwork(ctx, t, 5, map[int]hostDescr{
0: {conns: []int{}, chainID: "2"},
1: {conns: []int{0}, chainID: "1", realKey: true},
2: {conns: []int{0}, chainID: "1", realKey: true},
3: {conns: []int{1}, chainID: "2", realKey: true},
4: {conns: []int{2}, chainID: "2", realKey: true},
}, validators, logger)

// wait for clients to finish refreshing routing tables
clients.WaitForDHT()

// this sleep is required for pubsub to "propagate" subscription information
// TODO(tzdybal): is there a better way to wait for readiness?
time.Sleep(1 * time.Second)

// gossip from client 4
err := clients[4].GossipTx(ctx, expectedMsg)
assert.NoError(err)

// wait for clients that should receive Tx
wg.Wait()
}

func TestSeedStringParsing(t *testing.T) {
t.Parallel()

Expand Down
114 changes: 0 additions & 114 deletions p2p/gossip.go

This file was deleted.

3 changes: 1 addition & 2 deletions p2p/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func getAddr(sk crypto.PrivKey) (multiaddr.Multiaddr, error) {
return a, nil
}

func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, validators []GossipValidator, logger log.Logger) testNet {
func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, logger log.Logger) testNet {
t.Helper()
require := require.New(t)

Expand Down Expand Up @@ -111,7 +111,6 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos
require.NoError(err)
require.NotNil(client)

client.SetTxValidator(validators[i])
clients[i] = client
}

Expand Down
Loading