Skip to content

Commit 048e157

Browse files
Fix test
1 parent 8f780fc commit 048e157

5 files changed

Lines changed: 100 additions & 39 deletions

File tree

block/store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ func (m *Manager) DataStoreRetrieveLoop(ctx context.Context) {
9191
}
9292
//TODO: remove junk if possible
9393
m.logger.Debug("data retrieved from p2p data sync", "dataHeight", d.Metadata.Height, "daHeight", daHeight)
94-
m.dataInCh <- NewDataEvent{d, daHeight}
94+
if len(d.Txs) > 0 {
95+
m.dataInCh <- NewDataEvent{d, daHeight}
96+
}
9597
}
9698
}
9799
lastDataStoreHeight = dataStoreHeight

block/sync.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
4646
}
4747
m.headerCache.SetItem(headerHeight, header)
4848

49-
m.sendNonBlockingSignalToHeaderStoreCh()
50-
m.sendNonBlockingSignalToRetrieveCh()
51-
5249
// check if the dataHash is dataHashForEmptyTxs
5350
// no need to wait for syncing Data, instead prepare now and set
5451
// so that trySyncNextBlock can progress
@@ -72,10 +69,18 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
7269
data := dataEvent.Data
7370
daHeight := dataEvent.DAHeight
7471
dataHash := data.DACommitment().String()
75-
m.logger.Debug("data retrieved",
76-
"daHeight", daHeight,
77-
"hash", dataHash,
78-
)
72+
if data.Metadata != nil {
73+
m.logger.Debug("data retrieved",
74+
"daHeight", daHeight,
75+
"hash", dataHash,
76+
"height", data.Metadata.Height,
77+
)
78+
} else {
79+
m.logger.Debug("data retrieved",
80+
"daHeight", daHeight,
81+
"hash", dataHash,
82+
)
83+
}
7984
if m.dataCache.IsSeen(dataHash) {
8085
m.logger.Debug("data already seen", "data hash", dataHash)
8186
continue
@@ -109,9 +114,6 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
109114

110115
m.dataCache.SetItemByHash(dataHash, data)
111116

112-
m.sendNonBlockingSignalToDataStoreCh()
113-
m.sendNonBlockingSignalToRetrieveCh()
114-
115117
err = m.trySyncNextBlock(ctx, daHeight)
116118
if err != nil {
117119
errCh <- fmt.Errorf("failed to sync next block: %w", err)

node/full.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,8 @@ func (n *FullNode) Run(parentCtx context.Context) error {
476476
// http.ErrServerClosed is expected on graceful shutdown
477477
if err != nil && !errors.Is(err, http.ErrServerClosed) {
478478
multiErr = errors.Join(multiErr, fmt.Errorf("shutting down Prometheus server: %w", err))
479+
} else {
480+
n.Logger.Debug("Prometheus server shutdown context ended", "reason", err)
479481
}
480482
}
481483

@@ -484,6 +486,8 @@ func (n *FullNode) Run(parentCtx context.Context) error {
484486
err = n.pprofSrv.Shutdown(shutdownCtx)
485487
if err != nil && !errors.Is(err, http.ErrServerClosed) {
486488
multiErr = errors.Join(multiErr, fmt.Errorf("shutting down pprof server: %w", err))
489+
} else {
490+
n.Logger.Debug("pprof server shutdown context ended", "reason", err)
487491
}
488492
}
489493

@@ -492,17 +496,23 @@ func (n *FullNode) Run(parentCtx context.Context) error {
492496
err = n.rpcServer.Shutdown(shutdownCtx)
493497
if err != nil && !errors.Is(err, http.ErrServerClosed) {
494498
multiErr = errors.Join(multiErr, fmt.Errorf("shutting down RPC server: %w", err))
499+
} else {
500+
n.Logger.Debug("RPC server shutdown context ended", "reason", err)
495501
}
496502
}
497503

498504
// Ensure Store.Close is called last to maximize chance of data flushing
499505
if err = n.Store.Close(); err != nil {
500506
multiErr = errors.Join(multiErr, fmt.Errorf("closing store: %w", err))
507+
} else {
508+
n.Logger.Debug("store closed")
501509
}
502510

503511
// Save caches if needed
504512
if err := n.blockManager.SaveCache(); err != nil {
505513
multiErr = errors.Join(multiErr, fmt.Errorf("saving caches: %w", err))
514+
} else {
515+
n.Logger.Debug("caches saved")
506516
}
507517

508518
// Log final status

node/full_node_integration_test.go

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func TestFullNodeTestSuite(t *testing.T) {
132132
func (s *FullNodeTestSuite) TestBlockProduction() {
133133
s.executor.InjectTx([]byte("test transaction"))
134134
err := waitForAtLeastNBlocks(s.node, 5, Store)
135-
s.NoError(err, "Failed to produce second block")
135+
s.NoError(err, "Failed to produce more than 5 blocks")
136136

137137
// Get the current height
138138
height, err := s.node.Store.Height(s.ctx)
@@ -176,53 +176,94 @@ func (s *FullNodeTestSuite) TestSubmitBlocksToDA() {
176176

177177
// TestTxGossipingAndAggregation tests that transactions are gossiped and blocks are aggregated and synced across multiple nodes.
178178
// It creates 4 nodes (1 aggregator, 3 full nodes), injects a transaction, waits for all nodes to sync, and asserts block equality.
179-
func (s *FullNodeTestSuite) TestTxGossipingAndAggregation() {
180-
// First, stop the current node by cancelling its context
181-
s.cancel()
179+
func TestTxGossipingAndAggregation(t *testing.T) {
180+
config := getTestConfig(t, 1)
182181

183-
// Create a new context for the new node
184-
s.ctx, s.cancel = context.WithCancel(context.Background())
182+
numNodes := 2
183+
nodes, cleanups := createNodesWithCleanup(t, numNodes, config)
184+
for _, cleanup := range cleanups {
185+
defer cleanup()
186+
}
185187

186-
// Reset error channel
187-
s.errCh = make(chan error, 1)
188+
ctxs := make([]context.Context, numNodes)
189+
cancelFuncs := make([]context.CancelFunc, numNodes)
190+
var runningWg sync.WaitGroup
188191

189-
require := require.New(s.T())
190-
config := getTestConfig(s.T(), 1)
192+
// Create a context and cancel function for each node
193+
for i := 0; i < numNodes; i++ {
194+
ctx, cancel := context.WithCancel(context.Background())
195+
ctxs[i] = ctx
196+
cancelFuncs[i] = cancel
197+
}
191198

192-
numNodes := 2
193-
nodes, cleanups := createNodesWithCleanup(s.T(), numNodes, config)
194-
defer func() {
195-
for _, cleanup := range cleanups {
196-
cleanup()
199+
// Start only nodes[0] (aggregator) first
200+
runningWg.Add(1)
201+
go func(node *FullNode, ctx context.Context) {
202+
defer runningWg.Done()
203+
err := node.Run(ctx)
204+
if err != nil && !errors.Is(err, context.Canceled) {
205+
t.Logf("Error running node 0: %v", err)
197206
}
198-
}()
207+
}(nodes[0], ctxs[0])
199208

200-
s.node = nodes[0]
209+
// Wait for the first block to be produced by the aggregator
210+
err := waitForFirstBlock(nodes[0], Header)
211+
require.NoError(t, err, "Failed to get node height")
201212

202-
// Start all nodes in background
203-
for _, node := range nodes {
204-
s.startNodeInBackground(node)
213+
// Verify block manager is properly initialized
214+
require.NotNil(t, nodes[0].blockManager, "Block manager should be initialized")
215+
216+
// Now start the other nodes
217+
for i := 1; i < numNodes; i++ {
218+
runningWg.Add(1)
219+
go func(node *FullNode, ctx context.Context, idx int) {
220+
defer runningWg.Done()
221+
err := node.Run(ctx)
222+
if err != nil && !errors.Is(err, context.Canceled) {
223+
t.Logf("Error running node %d: %v", idx, err)
224+
}
225+
}(nodes[i], ctxs[i], i)
205226
}
206227

207228
// Inject a transaction into the aggregator's executor
208-
executor := nodes[0].blockManager.GetExecutor().(*coreexecutor.DummyExecutor)
209-
executor.InjectTx([]byte("gossip tx"))
229+
// executor := nodes[0].blockManager.GetExecutor().(*coreexecutor.DummyExecutor)
230+
// executor.InjectTx([]byte("gossip tx"))
210231

211232
// Wait for all nodes to reach at least 3 blocks
212233
for _, node := range nodes {
213-
require.NoError(waitForAtLeastNBlocks(node, 3, Store))
234+
require.NoError(t, waitForAtLeastNBlocks(node, 3, Store))
235+
}
236+
237+
// Cancel all node contexts to signal shutdown
238+
for _, cancel := range cancelFuncs {
239+
cancel()
240+
}
241+
242+
// Wait for all nodes to stop, with a timeout
243+
waitCh := make(chan struct{})
244+
go func() {
245+
runningWg.Wait()
246+
close(waitCh)
247+
}()
248+
249+
select {
250+
case <-waitCh:
251+
// Nodes stopped successfully
252+
case <-time.After(5 * time.Second):
253+
t.Log("Warning: Not all nodes stopped gracefully within timeout")
214254
}
215255

216256
// Assert that all nodes have the same block at height 1 and 2
217257
for height := uint64(1); height <= 2; height++ {
218258
var refHash []byte
219259
for i, node := range nodes {
220260
header, _, err := node.Store.GetBlockData(context.Background(), height)
221-
require.NoError(err)
261+
require.NoError(t, err)
222262
if i == 0 {
223263
refHash = header.Hash()
224264
} else {
225-
s.Equal(refHash, header.Hash(), "Block hash mismatch at height %d between node 0 and node %d", height, i)
265+
headerHash := header.Hash()
266+
require.EqualValues(t, refHash, headerHash, "Block hash mismatch at height %d between node 0 and node %d", height, i)
226267
}
227268
}
228269
}

node/helpers_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package node
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"testing"
78
"time"
89

@@ -167,12 +168,14 @@ func createNodesWithCleanup(t *testing.T, num int, config rollkitconfig.Config)
167168

168169
nodes[0], cleanups[0] = aggNode.(*FullNode), cleanup
169170
config.Node.Aggregator = false
171+
aggPeerAddress := fmt.Sprintf("%s/p2p/%s", aggListenAddress, aggPeerID.Loggable()["peerID"].(string))
172+
peersList := []string{aggPeerAddress}
170173
for i := 1; i < num; i++ {
171174
ctx, cancel := context.WithCancel(context.Background())
175+
config.P2P.Peers = strings.Join(peersList, ",")
172176
config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i)
173-
config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8000+i+1)
174-
config.P2P.Peers = fmt.Sprintf("%s/p2p/%s", aggListenAddress, aggPeerID.Loggable()["peerID"].(string))
175-
executor, sequencer, _, p2pClient, ds, _ = createTestComponents(t, config)
177+
config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i)
178+
executor, sequencer, _, p2pClient, _, nodeP2PKey := createTestComponents(t, config)
176179
node, err := NewNode(
177180
ctx,
178181
config,
@@ -194,6 +197,9 @@ func createNodesWithCleanup(t *testing.T, num int, config rollkitconfig.Config)
194197
cancel()
195198
}
196199
nodes[i], cleanups[i] = node.(*FullNode), cleanup
200+
nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey)
201+
require.NoError(err)
202+
peersList = append(peersList, fmt.Sprintf("%s/p2p/%s", config.P2P.ListenAddress, nodePeerID.Loggable()["peerID"].(string)))
197203
}
198204

199205
return nodes, cleanups

0 commit comments

Comments
 (0)