Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions storage/grpc_reader_multi_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
manager.wg.Wait()
return nil, manager.permanentErr
}
mrd.Attrs = manager.attrs
if manager.attrs != nil {
mrd.Attrs = *manager.attrs
}
return mrd, nil
case <-ctx.Done():
cancel()
Expand Down Expand Up @@ -220,7 +222,7 @@ type multiRangeDownloaderManager struct {
waiters []chan struct{}
readSpec *storagepb.BidiReadObjectSpec
lastReadHandle []byte
attrs ReaderObjectAttrs
attrs *ReaderObjectAttrs
attrsReady chan struct{}
attrsOnce sync.Once
spanCtx context.Context
Expand Down Expand Up @@ -487,8 +489,8 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd
}
m.readIDCounter++

// Attributes should be ready if we are processing Add commands
if req.offset < 0 {
// Convert to positive offset only if attributes are available.
if m.attrs != nil && req.offset < 0 {
err := m.convertToPositiveOffset(req)
if err != nil {
return
Expand Down Expand Up @@ -517,23 +519,20 @@ func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest)
if req.offset >= 0 {
return nil
}
objSize := m.attrs.Size
if objSize <= 0 {
err := errors.New("storage: cannot resolve negative offset without object size")
m.failRange(req, err)
return err
var objSize int64
if m.attrs != nil {
objSize = m.attrs.Size
}
if req.length != 0 {
err := fmt.Errorf("storage: negative offset with non-zero length is not supported (offset: %d, length: %d)", req.origOffset, req.origLength)
if objSize <= 0 {
err := errors.New("storage: cannot resolve negative offset with object size as 0")
m.failRange(req, err)
return err
}
start := objSize + req.offset
if start < 0 {
start = 0
}
start := max(objSize+req.offset, 0)
req.offset = start
req.length = objSize - start
if req.length == 0 {
req.length = objSize - start
}
return nil
}

Expand Down Expand Up @@ -573,15 +572,12 @@ func (m *multiRangeDownloaderManager) processSessionResult(result mrdSessionResu
if meta := resp.GetMetadata(); meta != nil {
obj := newObjectFromProto(meta)
attrs := readerAttrsFromObject(obj)
m.attrs = attrs

m.attrs = &attrs
for _, req := range m.pendingRanges {
if req.offset < 0 {
_ = m.convertToPositiveOffset(req)
}
}
} else {
m.handleStreamEnd(mrdSessionResult{err: errors.New("storage: first response from BidiReadObject stream missing metadata")})
}
})

Expand Down
35 changes: 30 additions & 5 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ func TestIntegration_MRDCallbackReturnsDataLength(t *testing.T) {
func TestIntegration_MRDWithReadHandle(t *testing.T) {
multiTransportTest(skipAllButZonal(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
const (
dataSize = 1000
offset = 0
limit = 100
dataSize = 1000
offset = 0
limit = 100
negativeOffset = -200
)

// Generate random content for testing.
Expand Down Expand Up @@ -562,7 +563,7 @@ func TestIntegration_MRDWithReadHandle(t *testing.T) {
}

// Perform the read operation.
var res1, res2 multiRangeDownloaderOutput
var res1, res2, res3, res4 multiRangeDownloaderOutput
mrd.Add(&res1.buf, offset, limit, func(x, y int64, err error) {
res1.offset = x
res1.limit = y
Expand All @@ -573,14 +574,28 @@ func TestIntegration_MRDWithReadHandle(t *testing.T) {
res2.limit = y
res2.err = err
})
mrd2.Add(&res3.buf, negativeOffset, limit, func(x, y int64, err error) {
res3.offset = x
res3.limit = y
res3.err = err
})
mrd2.Add(&res4.buf, negativeOffset, 0, func(x, y int64, err error) {
res4.offset = x
res4.limit = y
res4.err = err
})

mrd.Wait()
mrd2.Wait()

if res1.err != nil {
t.Fatalf("mrd.Add callback returned error: %v", res1.err)
}
if res2.err != nil {
t.Fatalf("mrd2.Add callback returned error: %v", res2.err)
t.Fatalf("mrd2.Add callback returned error for res2: %v", res2.err)
}
if res3.err != nil {
t.Fatalf("mrd2.Add callback returned error for res3: %v", res3.err)
}

// Validate results for mrd with read handle.
Expand All @@ -593,6 +608,16 @@ func TestIntegration_MRDWithReadHandle(t *testing.T) {
t.Errorf("mrd2 downloaded content mismatch. got %d bytes, want %d bytes", len(got), len(want))
}

want = content[max(0, dataSize+negativeOffset):min(dataSize, max(0, dataSize+negativeOffset)+limit)]
if got := res3.buf.Bytes(); !bytes.Equal(got, want) {
t.Errorf("mrd2 downloaded content mismatch. got %v bytes, want %v bytes. %v", got, want, content)
}

want = content[max(0, dataSize+negativeOffset):]
if got := res4.buf.Bytes(); !bytes.Equal(got, want) {
t.Errorf("mrd2 downloaded content mismatch. got %v bytes, want %v bytes. %v", got, want, content)
}

if err := mrd.Close(); err != nil {
t.Fatalf("Error while closing reader: %v", err)
}
Expand Down
Loading