Skip to content

Commit 21d5232

Browse files
feat: feed wrapping (#469)
* feat: feed wrapping * fix: download root chunk after delay * fix(manifest): improve error handling * fix(feed): improve error handling * chore(manifest): fix spelling error * fix: download file address instead of manifest * fix: init chunk with span * fix: init chunk with span --------- Co-authored-by: nugaon <nugaon1@gmail.com> Co-authored-by: Ljubisa Gacevic <ljubisa.rs@gmail.com>
1 parent 75e5c6c commit 21d5232

4 files changed

Lines changed: 112 additions & 63 deletions

File tree

pkg/bee/api/feed.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88
"io"
99
"net/http"
1010
"strconv"
11-
"time"
1211

13-
"github.com/ethersphere/bee/v2/pkg/cac"
1412
"github.com/ethersphere/bee/v2/pkg/crypto"
1513
"github.com/ethersphere/bee/v2/pkg/soc"
1614
"github.com/ethersphere/bee/v2/pkg/swarm"
@@ -69,14 +67,8 @@ func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Sign
6967
return &response, nil
7068
}
7169

72-
// UpdateWithReference updates a feed with a reference
73-
func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) {
74-
ts := make([]byte, 8)
75-
binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix()))
76-
ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...))
77-
if err != nil {
78-
return nil, err
79-
}
70+
// UpdateWithRootChunk updates a feed with a root chunk
71+
func (f *FeedService) UpdateWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o UploadOptions) (*SocResponse, error) {
8072
ownerHex, err := ownerFromSigner(signer)
8173
if err != nil {
8274
return nil, err

pkg/bee/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -910,14 +910,14 @@ func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOpti
910910
func (c *Client) DownloadManifestFile(ctx context.Context, a swarm.Address, path string) (size int64, hash []byte, err error) {
911911
r, err := c.api.Dirs.Download(ctx, a, path)
912912
if err != nil {
913-
return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err)
913+
return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err)
914914
}
915915
defer r.Close()
916916

917917
h := fileHasher()
918918
size, err = io.Copy(h, r)
919919
if err != nil {
920-
return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err)
920+
return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err)
921921
}
922922

923923
return size, h.Sum(nil), nil
@@ -1009,9 +1009,9 @@ func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signe
10091009
return c.api.Feed.CreateRootManifest(ctx, signer, topic, o)
10101010
}
10111011

1012-
// UpdateFeedWithReference updates a feed with a reference
1013-
func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) {
1014-
return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o)
1012+
// UpdateFeedWithRootChunk updates a feed with a root chunk
1013+
func (c *Client) UpdateFeedWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o api.UploadOptions) (*api.SocResponse, error) {
1014+
return c.api.Feed.UpdateWithRootChunk(ctx, signer, topic, i, ch, o)
10151015
}
10161016

10171017
// FindFeedUpdate finds the latest update for a feed

pkg/check/feed/feed.go

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/ethersphere/bee/v2/pkg/cac"
910
"github.com/ethersphere/bee/v2/pkg/crypto"
1011
"github.com/ethersphere/bee/v2/pkg/swarm"
1112
"github.com/ethersphere/beekeeper/pkg/bee"
@@ -58,9 +59,18 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
5859

5960
if o.RootRef != "" {
6061
c.logger.Infof("running availability check")
61-
return c.checkAvailability(ctx, cluster, o)
62+
if err := c.checkAvailability(ctx, cluster, o); err != nil {
63+
return fmt.Errorf("availability check: %w", err)
64+
}
65+
return nil
66+
}
67+
68+
c.logger.Infof("running feed check")
69+
if err := c.feedCheck(ctx, cluster, o); err != nil {
70+
return fmt.Errorf("feed check: %w", err)
6271
}
63-
return c.feedCheck(ctx, cluster, o)
72+
73+
return nil
6474
}
6575

6676
func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error {
@@ -69,70 +79,71 @@ func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Clu
6979
return fmt.Errorf("invalid root ref: %w", err)
7080
}
7181

72-
nodeNames := cluster.FullNodeNames()
73-
nodeName := nodeNames[0]
74-
clients, err := cluster.NodesClients(ctx)
82+
clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano()))
7583
if err != nil {
76-
return err
84+
return fmt.Errorf("node clients: %w", err)
85+
}
86+
87+
if len(clients) < 1 {
88+
return fmt.Errorf("availability check requires at least 1 full node")
7789
}
7890

79-
client := clients[nodeName]
80-
_, _, err = client.DownloadFile(ctx, ref, nil)
91+
_, _, err = clients[0].DownloadFile(ctx, ref, nil)
8192
if err != nil {
82-
return err
93+
return fmt.Errorf("download root feed: %w", err)
8394
}
95+
8496
return nil
8597
}
8698

8799
// feedCheck creates a root feed manifest, makes a series of updates to the feed
88100
// and verifies that the updates are retrievable via another node.
89101
func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error {
90-
rnd := random.PseudoGenerator(time.Now().UnixNano())
91-
names := cluster.FullNodeNames()
92-
perm := rnd.Perm(len(names))
93-
94-
if len(names) < 2 {
95-
return fmt.Errorf("not enough nodes to run feed check")
102+
clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano()))
103+
if err != nil {
104+
return fmt.Errorf("node clients: %w", err)
96105
}
97106

98-
clients, err := cluster.NodesClients(ctx)
99-
if err != nil {
100-
return err
107+
if len(clients) < 2 {
108+
return fmt.Errorf("feed check requires at least 2 full nodes")
101109
}
102-
upClient := clients[names[perm[0]]]
103-
downClient := clients[names[perm[1]]]
110+
111+
upClient := clients[0]
112+
downClient := clients[1]
104113

105114
c.logger.Infof("upload client: %s", upClient.Name())
106115

107116
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
108117
if err != nil {
109-
return err
118+
return fmt.Errorf("get or create mutable batch: %w", err)
110119
}
111120

112121
privKey, err := crypto.GenerateSecp256k1Key()
113122
if err != nil {
114-
return err
123+
return fmt.Errorf("generate private key: %w", err)
115124
}
116125

117126
signer := crypto.NewDefaultSigner(privKey)
118127
topic, err := crypto.LegacyKeccak256([]byte("my-topic"))
119128
if err != nil {
120-
return err
129+
return fmt.Errorf("topic hash: %w", err)
121130
}
122131

123132
// create root
124133
createManifestRes, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID})
125134
if err != nil {
126135
return err
127136
}
137+
128138
c.logger.Infof("node %s: manifest created", upClient.Name())
129139
c.logger.Infof("reference: %s", createManifestRes.Reference)
130140
c.logger.Infof("owner: %s", createManifestRes.Owner)
131141
c.logger.Infof("topic: %s", createManifestRes.Topic)
132142

133143
// make updates
134-
for i := 0; i < o.NUpdates; i++ {
144+
for i := range o.NUpdates {
135145
time.Sleep(3 * time.Second)
146+
136147
data := fmt.Sprintf("update-%d", i)
137148
fName := fmt.Sprintf("file-%d", i)
138149
file := bee.NewBufferFile(fName, bytes.NewBuffer([]byte(data)))
@@ -141,13 +152,26 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
141152
Direct: true,
142153
})
143154
if err != nil {
144-
return err
155+
return fmt.Errorf("upload file `%s`: %w", fName, err)
156+
}
157+
158+
// download root chunk of file
159+
rChData, err := upClient.DownloadChunk(ctx, file.Address(), "", nil)
160+
if err != nil {
161+
return fmt.Errorf("download root chunk: %w", err)
145162
}
146-
ref := file.Address()
147-
socRes, err := upClient.UpdateFeedWithReference(ctx, signer, topic, uint64(i), ref, api.UploadOptions{BatchID: batchID})
163+
164+
// make chunk from byte array rChData
165+
rCh, err := cac.NewWithDataSpan(rChData)
166+
if err != nil {
167+
return fmt.Errorf("create chunk: %w", err)
168+
}
169+
170+
socRes, err := upClient.UpdateFeedWithRootChunk(ctx, signer, topic, uint64(i), rCh, api.UploadOptions{BatchID: batchID})
148171
if err != nil {
149-
return err
172+
return fmt.Errorf("update feed with root chunk: %w", err)
150173
}
174+
151175
c.logger.Infof("node %s: feed updated", upClient.Name())
152176
c.logger.Infof("soc reference: %s", socRes.Reference)
153177
c.logger.Infof("wrapped reference: %s", file.Address())
@@ -159,7 +183,7 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
159183
c.logger.Infof("download client: %s", downClient.Name())
160184
update, err := downClient.FindFeedUpdate(ctx, signer, topic, nil)
161185
if err != nil {
162-
return err
186+
return fmt.Errorf("find feed update: %w", err)
163187
}
164188

165189
c.logger.Infof("node %s: feed update found", downClient.Name())
@@ -175,9 +199,11 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
175199
if err != nil {
176200
return fmt.Errorf("download root feed: %w", err)
177201
}
202+
178203
lastUpdateData := fmt.Sprintf("update-%d", o.NUpdates-1)
179204
if string(d) != lastUpdateData {
180205
return fmt.Errorf("expected file content to be %s, got %s", lastUpdateData, string(d))
181206
}
207+
182208
return nil
183209
}

0 commit comments

Comments
 (0)