diff --git a/node/full.go b/node/full.go index 2bdf95501d..e00fd80270 100644 --- a/node/full.go +++ b/node/full.go @@ -19,10 +19,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/libs/service" - corep2p "github.com/cometbft/cometbft/p2p" cmtypes "github.com/cometbft/cometbft/types" proxyda "github.com/rollkit/go-da/proxy" @@ -75,7 +73,6 @@ type FullNode struct { dSyncService *block.DataSyncService // TODO(tzdybal): consider extracting "mempool reactor" Mempool mempool.Mempool - mempoolIDs *mempoolIDs Store store.Store blockManager *block.Manager @@ -181,7 +178,6 @@ func newFullNode( } node.BaseService = *service.NewBaseService(logger, "Node", node) - node.p2pClient.SetTxValidator(node.newTxValidator(p2pMetrics)) return node, nil } @@ -466,46 +462,6 @@ func (n *FullNode) EventBus() *cmtypes.EventBus { return n.eventBus } -// newTxValidator creates a pubsub validator that uses the node's mempool to check the -// transaction. If the transaction is valid, then it is added to the mempool -func (n *FullNode) newTxValidator(metrics *p2p.Metrics) p2p.GossipValidator { - return func(m *p2p.GossipMessage) bool { - n.Logger.Debug("transaction received", "bytes", len(m.Data)) - msgBytes := m.Data - labels := []string{ - "peer_id", m.From.String(), - "chID", n.genesis.ChainID, - } - metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - metrics.MessageReceiveBytesTotal.With("message_type", "tx").Add(float64(len(msgBytes))) - checkTxResCh := make(chan *abci.ResponseCheckTx, 1) - err := n.Mempool.CheckTx(m.Data, func(resp *abci.ResponseCheckTx) { - select { - case <-n.ctx.Done(): - return - case checkTxResCh <- resp: - } - }, mempool.TxInfo{ - SenderID: n.mempoolIDs.GetForPeer(m.From), - SenderP2PID: corep2p.ID(m.From), - }) - switch { - case errors.Is(err, mempool.ErrTxInCache): - return true - case errors.Is(err, mempool.ErrMempoolIsFull{}): - return true - case errors.Is(err, mempool.ErrTxTooLarge{}): - return false - case errors.Is(err, mempool.ErrPreCheck{}): - return false - default: - } - checkTxResp := <-checkTxResCh - - return checkTxResp.Code == abci.CodeTypeOK - } -} - func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) } diff --git a/node/light.go b/node/light.go index c9d5defa0e..0aa743def3 100644 --- a/node/light.go +++ b/node/light.go @@ -72,8 +72,6 @@ func newLightNode( ctx: ctx, } - node.P2P.SetTxValidator(node.falseValidator()) - node.BaseService = *service.NewBaseService(logger, "LightNode", node) return node, nil @@ -113,10 +111,3 @@ func (ln *LightNode) OnStop() { err = errors.Join(err, ln.hSyncService.Stop(ln.ctx)) ln.Logger.Error("errors while stopping node:", "errors", err) } - -// Dummy validator that always returns a callback function with boolean `false` -func (ln *LightNode) falseValidator() p2p.GossipValidator { - return func(*p2p.GossipMessage) bool { - return false - } -} diff --git a/p2p/client.go b/p2p/client.go index d74f5d7b48..922329ce3e 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -38,9 +38,6 @@ const ( // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 - - // txTopicSuffix is added after namespace to create pubsub topic for TX gossiping. - txTopicSuffix = "-tx" ) // Client is a P2P client, implemented with libp2p. @@ -59,9 +56,6 @@ type Client struct { gater *conngater.BasicConnectionGater ps *pubsub.PubSub - txGossiper *Gossiper - txValidator GossipValidator - // cancel is used to cancel context passed to libp2p functions // it's required because of discovery.Advertise call cancel context.CancelFunc @@ -109,7 +103,7 @@ func (c *Client) Start(ctx context.Context) error { // create new, cancelable context ctx, c.cancel = context.WithCancel(ctx) c.logger.Debug("starting P2P client") - host, err := c.listen(ctx) + host, err := c.listen() if err != nil { return err } @@ -155,23 +149,11 @@ func (c *Client) Close() error { c.cancel() return errors.Join( - c.txGossiper.Close(), c.dht.Close(), c.host.Close(), ) } -// GossipTx sends the transaction to the P2P network. -func (c *Client) GossipTx(ctx context.Context, tx []byte) error { - c.logger.Debug("Gossiping TX", "len", len(tx)) - return c.txGossiper.Publish(ctx, tx) -} - -// SetTxValidator sets the callback function, that will be invoked during message gossiping. -func (c *Client) SetTxValidator(val GossipValidator) { - c.txValidator = val -} - // Addrs returns listen addresses of Client. func (c *Client) Addrs() []multiaddr.Multiaddr { return c.host.Addrs() @@ -243,7 +225,7 @@ func (c *Client) Peers() []PeerConnection { return res } -func (c *Client) listen(ctx context.Context) (host.Host, error) { +func (c *Client) listen() (host.Host, error) { maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { return nil, err @@ -358,13 +340,6 @@ func (c *Client) setupGossiping(ctx context.Context) error { if err != nil { return err } - - c.txGossiper, err = NewGossiper(c.host, c.ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator)) - if err != nil { - return err - } - go c.txGossiper.ProcessMessages(ctx) - return nil } @@ -398,7 +373,3 @@ func (c *Client) parseAddrInfoList(addrInfoStr string) []peer.AddrInfo { func (c *Client) getNamespace() string { return c.chainID } - -func (c *Client) getTxTopic() string { - return c.getNamespace() + txTopicSuffix -} diff --git a/p2p/client_test.go b/p2p/client_test.go index ad7b5d1a68..8d573a8dfc 100644 --- a/p2p/client_test.go +++ b/p2p/client_test.go @@ -3,9 +3,7 @@ package p2p import ( "context" "crypto/rand" - "sync" "testing" - "time" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" @@ -71,7 +69,7 @@ func TestBootstrapping(t *testing.T) { 1: {conns: []int{0}}, 2: {conns: []int{0, 1}}, 3: {conns: []int{0}}, - }, make([]GossipValidator, 4), logger) + }, logger) // wait for clients to finish refreshing routing tables clients.WaitForDHT() @@ -92,7 +90,7 @@ func TestDiscovery(t *testing.T) { 2: {conns: []int{0}, chainID: "ORU2"}, 3: {conns: []int{1}, chainID: "ORU1"}, 4: {conns: []int{2}, chainID: "ORU1"}, - }, make([]GossipValidator, 5), logger) + }, logger) // wait for clients to finish refreshing routing tables clients.WaitForDHT() @@ -101,58 +99,6 @@ func TestDiscovery(t *testing.T) { assert.Contains(clients[4].host.Network().Peers(), clients[3].host.ID()) } -func TestGossiping(t *testing.T) { - assert := assert.New(t) - logger := test.NewFileLogger(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var expectedMsg = []byte("foobar") - var wg sync.WaitGroup - - wg.Add(3) - - // ensure that Tx is delivered to client - assertRecv := func(tx *GossipMessage) bool { - logger.Debug("received tx", "body", string(tx.Data), "from", tx.From) - assert.Equal(expectedMsg, tx.Data) - wg.Done() - return true - } - - // ensure that Tx is not delivered to client - assertNotRecv := func(*GossipMessage) bool { - t.Fatal("unexpected Tx received") - return false - } - - validators := []GossipValidator{assertRecv, assertNotRecv, assertNotRecv, assertRecv, assertRecv} - - // network connections topology: 3<->1<->0<->2<->4 - clients := startTestNetwork(ctx, t, 5, map[int]hostDescr{ - 0: {conns: []int{}, chainID: "2"}, - 1: {conns: []int{0}, chainID: "1", realKey: true}, - 2: {conns: []int{0}, chainID: "1", realKey: true}, - 3: {conns: []int{1}, chainID: "2", realKey: true}, - 4: {conns: []int{2}, chainID: "2", realKey: true}, - }, validators, logger) - - // wait for clients to finish refreshing routing tables - clients.WaitForDHT() - - // this sleep is required for pubsub to "propagate" subscription information - // TODO(tzdybal): is there a better way to wait for readiness? - time.Sleep(1 * time.Second) - - // gossip from client 4 - err := clients[4].GossipTx(ctx, expectedMsg) - assert.NoError(err) - - // wait for clients that should receive Tx - wg.Wait() -} - func TestSeedStringParsing(t *testing.T) { t.Parallel() diff --git a/p2p/gossip.go b/p2p/gossip.go deleted file mode 100644 index bee023d29d..0000000000 --- a/p2p/gossip.go +++ /dev/null @@ -1,114 +0,0 @@ -package p2p - -import ( - "context" - "errors" - - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/rollkit/rollkit/third_party/log" -) - -// GossipMessage represents message gossiped via P2P network (e.g. transaction, Block etc). -type GossipMessage struct { - Data []byte - From peer.ID -} - -// GossipValidator is a callback function type. -type GossipValidator func(*GossipMessage) bool - -// GossiperOption sets optional parameters of Gossiper. -type GossiperOption func(*Gossiper) error - -// WithValidator options registers topic validator for Gossiper. -func WithValidator(validator GossipValidator) GossiperOption { - return func(g *Gossiper) error { - return g.ps.RegisterTopicValidator(g.topic.String(), wrapValidator(validator)) - } -} - -// Gossiper is an abstraction of P2P publish subscribe mechanism. -type Gossiper struct { - ownID peer.ID - - ps *pubsub.PubSub - topic *pubsub.Topic - sub *pubsub.Subscription - - logger log.Logger -} - -// NewGossiper creates new, ready to use instance of Gossiper. -// -// Returned Gossiper object can be used for sending (Publishing) and receiving messages in topic identified by topicStr. -func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, logger log.Logger, options ...GossiperOption) (*Gossiper, error) { - topic, err := ps.Join(topicStr) - if err != nil { - return nil, err - } - - subscription, err := topic.Subscribe() - if err != nil { - return nil, err - } - g := &Gossiper{ - ownID: host.ID(), - ps: ps, - topic: topic, - sub: subscription, - logger: logger, - } - - for _, option := range options { - err := option(g) - if err != nil { - return nil, err - } - } - - return g, nil -} - -// Close is used to disconnect from topic and free resources used by Gossiper. -func (g *Gossiper) Close() error { - err := g.ps.UnregisterTopicValidator(g.topic.String()) - g.sub.Cancel() - return errors.Join( - err, - g.topic.Close(), - ) -} - -// Publish publishes data to gossip topic. -func (g *Gossiper) Publish(ctx context.Context, data []byte) error { - return g.topic.Publish(ctx, data) -} - -// ProcessMessages waits for messages published in the topic and execute handler. -func (g *Gossiper) ProcessMessages(ctx context.Context) { - for { - _, err := g.sub.Next(ctx) - select { - case <-ctx.Done(): - return - default: - if err != nil { - g.logger.Error("failed to read message", "error", err) - return - } - } - // Logic is handled in validator - } -} - -func wrapValidator(validator GossipValidator) pubsub.Validator { - return func(_ context.Context, _ peer.ID, msg *pubsub.Message) bool { - return validator(&GossipMessage{ - Data: msg.Data, - From: msg.GetFrom(), - }) - } -} diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 7d5fca431e..129bc8b7f3 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -64,7 +64,7 @@ func getAddr(sk crypto.PrivKey) (multiaddr.Multiaddr, error) { return a, nil } -func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, validators []GossipValidator, logger log.Logger) testNet { +func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hostDescr, logger log.Logger) testNet { t.Helper() require := require.New(t) @@ -111,7 +111,6 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos require.NoError(err) require.NotNil(client) - client.SetTxValidator(validators[i]) clients[i] = client }