Skip to content

Commit 2331bb5

Browse files
committed
Mempool <-> P2P integration (work in progress)
1 parent 5bf7f48 commit 2331bb5

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

node/node.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package node
33
import (
44
"context"
55
"fmt"
6+
"reflect"
67

78
abci "github.com/lazyledger/lazyledger-core/abci/types"
89
llcfg "github.com/lazyledger/lazyledger-core/config"
10+
"github.com/lazyledger/lazyledger-core/libs/clist"
911
"github.com/lazyledger/lazyledger-core/libs/log"
1012
"github.com/lazyledger/lazyledger-core/libs/service"
1113
"github.com/lazyledger/lazyledger-core/mempool"
@@ -73,10 +75,11 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
7375
return node, nil
7476
}
7577

76-
func (n *Node) mempoolLoop(ctx context.Context) {
78+
func (n *Node) mempoolReadLoop(ctx context.Context) {
7779
for {
7880
select {
7981
case tx := <-n.incommingTxCh:
82+
n.Logger.Debug("tx received", "from", tx.From, "bytes", len(tx.Data))
8083
n.Mempool.CheckTx(tx.Data, func(resp *abci.Response) {}, mempool.TxInfo{
8184
SenderID: n.mempoolIDs.GetForPeer(tx.From),
8285
SenderP2PID: corep2p.ID(tx.From),
@@ -88,13 +91,59 @@ func (n *Node) mempoolLoop(ctx context.Context) {
8891
}
8992
}
9093

94+
func (n *Node) mempoolPublishLoop(ctx context.Context) {
95+
rawMempool := n.Mempool.(*mempool.CListMempool)
96+
var next *clist.CElement
97+
98+
for {
99+
// wait for transactions
100+
if next == nil {
101+
select {
102+
case <-rawMempool.TxsWaitChan():
103+
if next = rawMempool.TxsFront(); next != nil {
104+
continue
105+
}
106+
case <-ctx.Done():
107+
return
108+
}
109+
}
110+
111+
// send transactions
112+
for {
113+
v := reflect.Indirect(reflect.ValueOf(next.Value))
114+
f := v.FieldByName("tx")
115+
tx := f.Bytes()
116+
117+
err := n.P2P.GossipTx(ctx, tx)
118+
if err != nil {
119+
n.Logger.Error("failed to gossip transaction", "error", err)
120+
continue
121+
}
122+
123+
n := next.Next()
124+
if n == nil {
125+
continue
126+
}
127+
next = n
128+
}
129+
130+
select {
131+
case <-next.NextWaitChan():
132+
next = next.Next()
133+
case <-ctx.Done():
134+
return
135+
}
136+
}
137+
}
138+
91139
func (n *Node) OnStart() error {
92140
n.Logger.Info("starting P2P client")
93141
err := n.P2P.Start(n.ctx)
94142
if err != nil {
95143
return fmt.Errorf("error while starting P2P client: %w", err)
96144
}
97-
go n.mempoolLoop(n.ctx)
145+
go n.mempoolReadLoop(n.ctx)
146+
go n.mempoolPublishLoop(n.ctx)
98147
n.P2P.SetTxHandler(func(tx *p2p.Tx) {
99148
n.incommingTxCh <- tx
100149
})

p2p/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ func (c *Client) Close() error {
142142
}
143143

144144
func (c *Client) GossipTx(ctx context.Context, tx []byte) error {
145+
c.logger.Debug("Gossiping TX", "len", len(tx))
145146
return c.txTopic.Publish(ctx, tx)
146147
}
147148

0 commit comments

Comments
 (0)