Skip to content

Commit c989f8f

Browse files
authored
Merge pull request #1627 from giuseppe/chunked-store-tarsplit
chunked: generate tar-split as part of zstd:chunked
2 parents cc0d208 + 032aae3 commit c989f8f

13 files changed

Lines changed: 345 additions & 107 deletions

File tree

drivers/btrfs/btrfs.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,18 +628,13 @@ func (d *Driver) Get(id string, options graphdriver.MountOpts) (string, error) {
628628
if err != nil {
629629
return "", err
630630
}
631-
switch len(options.Options) {
632-
case 0:
633-
case 1:
634-
if options.Options[0] == "ro" {
631+
for _, opt := range options.Options {
632+
if opt == "ro" {
635633
// ignore "ro" option
636-
break
634+
continue
637635
}
638-
fallthrough
639-
default:
640636
return "", fmt.Errorf("btrfs driver does not support mount options")
641637
}
642-
643638
if !st.IsDir() {
644639
return "", fmt.Errorf("%s: not a directory", dir)
645640
}

drivers/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ type DriverWithDifferOutput struct {
187187
UncompressedDigest digest.Digest
188188
Metadata string
189189
BigData map[string][]byte
190+
TarSplit []byte
191+
TOCDigest digest.Digest
190192
}
191193

192194
// Differ defines the interface for using a custom differ.

drivers/fsdiff.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (gdw *NaiveDiffDriver) Diff(id string, idMappings *idtools.IDMappings, pare
5555

5656
options := MountOpts{
5757
MountLabel: mountLabel,
58+
Options: []string{"ro"},
5859
}
5960
layerFs, err := driver.Get(id, options)
6061
if err != nil {

drivers/overlay/overlay.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,6 +1952,9 @@ func (d *Driver) ApplyDiffFromStagingDirectory(id, parent, stagingDirectory stri
19521952
if err := os.RemoveAll(diff); err != nil && !os.IsNotExist(err) {
19531953
return err
19541954
}
1955+
1956+
diffOutput.UncompressedDigest = diffOutput.TOCDigest
1957+
19551958
return os.Rename(stagingDirectory, diff)
19561959
}
19571960

drivers/vfs/driver.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,12 @@ func (d *Driver) Remove(id string) error {
226226
// Get returns the directory for the given id.
227227
func (d *Driver) Get(id string, options graphdriver.MountOpts) (_ string, retErr error) {
228228
dir := d.dir(id)
229-
switch len(options.Options) {
230-
case 0:
231-
case 1:
232-
if options.Options[0] == "ro" {
229+
230+
for _, opt := range options.Options {
231+
if opt == "ro" {
233232
// ignore "ro" option
234-
break
233+
continue
235234
}
236-
fallthrough
237-
default:
238235
return "", fmt.Errorf("vfs driver does not support mount options")
239236
}
240237
if st, err := os.Stat(dir); err != nil {

layers.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,8 +2392,26 @@ func (r *layerStore) ApplyDiffFromStagingDirectory(id, stagingDirectory string,
23922392
layer.UncompressedDigest = diffOutput.UncompressedDigest
23932393
layer.UncompressedSize = diffOutput.Size
23942394
layer.Metadata = diffOutput.Metadata
2395-
if err = r.saveFor(layer); err != nil {
2396-
return err
2395+
if len(diffOutput.TarSplit) != 0 {
2396+
tsdata := bytes.Buffer{}
2397+
compressor, err := pgzip.NewWriterLevel(&tsdata, pgzip.BestSpeed)
2398+
if err != nil {
2399+
compressor = pgzip.NewWriter(&tsdata)
2400+
}
2401+
if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that
2402+
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err)
2403+
}
2404+
if _, err := compressor.Write(diffOutput.TarSplit); err != nil {
2405+
compressor.Close()
2406+
return err
2407+
}
2408+
compressor.Close()
2409+
if err := os.MkdirAll(filepath.Dir(r.tspath(layer.ID)), 0o700); err != nil {
2410+
return err
2411+
}
2412+
if err := ioutils.AtomicWriteFile(r.tspath(layer.ID), tsdata.Bytes(), 0o600); err != nil {
2413+
return err
2414+
}
23972415
}
23982416
for k, v := range diffOutput.BigData {
23992417
if err := r.SetBigData(id, k, bytes.NewReader(v)); err != nil {
@@ -2403,6 +2421,9 @@ func (r *layerStore) ApplyDiffFromStagingDirectory(id, stagingDirectory string,
24032421
return err
24042422
}
24052423
}
2424+
if err = r.saveFor(layer); err != nil {
2425+
return err
2426+
}
24062427
return err
24072428
}
24082429

pkg/chunked/cache_linux.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -516,14 +516,14 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
516516

517517
iter := jsoniter.ParseBytes(jsoniter.ConfigFastest, manifest)
518518
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
519-
if field != "entries" {
519+
if strings.ToLower(field) != "entries" {
520520
iter.Skip()
521521
continue
522522
}
523523
for iter.ReadArray() {
524524
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
525-
switch field {
526-
case "type", "name", "linkName", "digest", "chunkDigest", "chunkType":
525+
switch strings.ToLower(field) {
526+
case "type", "name", "linkname", "digest", "chunkdigest", "chunktype", "modtime", "accesstime", "changetime":
527527
count += len(iter.ReadStringAsSlice())
528528
case "xattrs":
529529
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
@@ -548,33 +548,33 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
548548

549549
iter = jsoniter.ParseBytes(jsoniter.ConfigFastest, manifest)
550550
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
551-
if field == "version" {
551+
if strings.ToLower(field) == "version" {
552552
toc.Version = iter.ReadInt()
553553
continue
554554
}
555-
if field != "entries" {
555+
if strings.ToLower(field) != "entries" {
556556
iter.Skip()
557557
continue
558558
}
559559
for iter.ReadArray() {
560560
var m internal.FileMetadata
561561
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
562-
switch field {
562+
switch strings.ToLower(field) {
563563
case "type":
564564
m.Type = getString(iter.ReadStringAsSlice())
565565
case "name":
566566
m.Name = getString(iter.ReadStringAsSlice())
567-
case "linkName":
567+
case "linkname":
568568
m.Linkname = getString(iter.ReadStringAsSlice())
569569
case "mode":
570570
m.Mode = iter.ReadInt64()
571571
case "size":
572572
m.Size = iter.ReadInt64()
573-
case "UID":
573+
case "uid":
574574
m.UID = iter.ReadInt()
575-
case "GID":
575+
case "gid":
576576
m.GID = iter.ReadInt()
577-
case "ModTime":
577+
case "modtime":
578578
time, err := time.Parse(time.RFC3339, byteSliceAsString(iter.ReadStringAsSlice()))
579579
if err != nil {
580580
return nil, err
@@ -592,23 +592,23 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
592592
return nil, err
593593
}
594594
m.ChangeTime = &time
595-
case "devMajor":
595+
case "devmajor":
596596
m.Devmajor = iter.ReadInt64()
597-
case "devMinor":
597+
case "devminor":
598598
m.Devminor = iter.ReadInt64()
599599
case "digest":
600600
m.Digest = getString(iter.ReadStringAsSlice())
601601
case "offset":
602602
m.Offset = iter.ReadInt64()
603-
case "endOffset":
603+
case "endoffset":
604604
m.EndOffset = iter.ReadInt64()
605-
case "chunkSize":
605+
case "chunksize":
606606
m.ChunkSize = iter.ReadInt64()
607-
case "chunkOffset":
607+
case "chunkoffset":
608608
m.ChunkOffset = iter.ReadInt64()
609-
case "chunkDigest":
609+
case "chunkdigest":
610610
m.ChunkDigest = getString(iter.ReadStringAsSlice())
611-
case "chunkType":
611+
case "chunktype":
612612
m.ChunkType = getString(iter.ReadStringAsSlice())
613613
case "xattrs":
614614
m.Xattrs = make(map[string]string)

pkg/chunked/compression_linux.go

Lines changed: 86 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -150,22 +150,32 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
150150
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. The blob total size must
151151
// be specified.
152152
// This function uses the io.github.containers.zstd-chunked. annotations when specified.
153-
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, int64, error) {
153+
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
154154
footerSize := int64(internal.FooterSizeSupported)
155155
if blobSize <= footerSize {
156-
return nil, 0, errors.New("blob too small")
156+
return nil, nil, 0, errors.New("blob too small")
157157
}
158158

159159
manifestChecksumAnnotation := annotations[internal.ManifestChecksumKey]
160160
if manifestChecksumAnnotation == "" {
161-
return nil, 0, fmt.Errorf("manifest checksum annotation %q not found", internal.ManifestChecksumKey)
161+
return nil, nil, 0, fmt.Errorf("manifest checksum annotation %q not found", internal.ManifestChecksumKey)
162162
}
163163

164164
var offset, length, lengthUncompressed, manifestType uint64
165165

166+
var offsetTarSplit, lengthTarSplit, lengthUncompressedTarSplit uint64
167+
tarSplitChecksumAnnotation := ""
168+
166169
if offsetMetadata := annotations[internal.ManifestInfoKey]; offsetMetadata != "" {
167170
if _, err := fmt.Sscanf(offsetMetadata, "%d:%d:%d:%d", &offset, &length, &lengthUncompressed, &manifestType); err != nil {
168-
return nil, 0, err
171+
return nil, nil, 0, err
172+
}
173+
174+
if tarSplitInfoKeyAnnotation, found := annotations[internal.TarSplitInfoKey]; found {
175+
if _, err := fmt.Sscanf(tarSplitInfoKeyAnnotation, "%d:%d:%d", &offsetTarSplit, &lengthTarSplit, &lengthUncompressedTarSplit); err != nil {
176+
return nil, nil, 0, err
177+
}
178+
tarSplitChecksumAnnotation = annotations[internal.TarSplitChecksumKey]
169179
}
170180
} else {
171181
chunk := ImageSourceChunk{
@@ -174,87 +184,126 @@ func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable
174184
}
175185
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
176186
if err != nil {
177-
return nil, 0, err
187+
return nil, nil, 0, err
178188
}
179189
var reader io.ReadCloser
180190
select {
181191
case r := <-parts:
182192
reader = r
183193
case err := <-errs:
184-
return nil, 0, err
194+
return nil, nil, 0, err
185195
}
186196
footer := make([]byte, footerSize)
187197
if _, err := io.ReadFull(reader, footer); err != nil {
188-
return nil, 0, err
198+
return nil, nil, 0, err
189199
}
190200

191201
offset = binary.LittleEndian.Uint64(footer[0:8])
192202
length = binary.LittleEndian.Uint64(footer[8:16])
193203
lengthUncompressed = binary.LittleEndian.Uint64(footer[16:24])
194204
manifestType = binary.LittleEndian.Uint64(footer[24:32])
195-
if !isZstdChunkedFrameMagic(footer[32:40]) {
196-
return nil, 0, errors.New("invalid magic number")
205+
if !isZstdChunkedFrameMagic(footer[48:56]) {
206+
return nil, nil, 0, errors.New("invalid magic number")
197207
}
198208
}
199209

200210
if manifestType != internal.ManifestTypeCRFS {
201-
return nil, 0, errors.New("invalid manifest type")
211+
return nil, nil, 0, errors.New("invalid manifest type")
202212
}
203213

204214
// set a reasonable limit
205215
if length > (1<<20)*50 {
206-
return nil, 0, errors.New("manifest too big")
216+
return nil, nil, 0, errors.New("manifest too big")
207217
}
208218
if lengthUncompressed > (1<<20)*50 {
209-
return nil, 0, errors.New("manifest too big")
219+
return nil, nil, 0, errors.New("manifest too big")
210220
}
211221

212222
chunk := ImageSourceChunk{
213223
Offset: offset,
214224
Length: length,
215225
}
216226

217-
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
227+
chunks := []ImageSourceChunk{chunk}
228+
229+
if offsetTarSplit > 0 {
230+
chunkTarSplit := ImageSourceChunk{
231+
Offset: offsetTarSplit,
232+
Length: lengthTarSplit,
233+
}
234+
chunks = append(chunks, chunkTarSplit)
235+
}
236+
237+
parts, errs, err := blobStream.GetBlobAt(chunks)
218238
if err != nil {
219-
return nil, 0, err
239+
return nil, nil, 0, err
220240
}
221-
var reader io.ReadCloser
222-
select {
223-
case r := <-parts:
224-
reader = r
225-
case err := <-errs:
226-
return nil, 0, err
241+
242+
readBlob := func(len uint64) ([]byte, error) {
243+
var reader io.ReadCloser
244+
select {
245+
case r := <-parts:
246+
reader = r
247+
case err := <-errs:
248+
return nil, err
249+
}
250+
251+
blob := make([]byte, len)
252+
if _, err := io.ReadFull(reader, blob); err != nil {
253+
reader.Close()
254+
return nil, err
255+
}
256+
if err := reader.Close(); err != nil {
257+
return nil, err
258+
}
259+
return blob, nil
227260
}
228261

229-
manifest := make([]byte, length)
230-
if _, err := io.ReadFull(reader, manifest); err != nil {
231-
return nil, 0, err
262+
manifest, err := readBlob(length)
263+
if err != nil {
264+
return nil, nil, 0, err
232265
}
233266

234-
manifestDigester := digest.Canonical.Digester()
235-
manifestChecksum := manifestDigester.Hash()
236-
if _, err := manifestChecksum.Write(manifest); err != nil {
237-
return nil, 0, err
267+
decodedBlob, err := decodeAndValidateBlob(manifest, lengthUncompressed, manifestChecksumAnnotation)
268+
if err != nil {
269+
return nil, nil, 0, err
270+
}
271+
decodedTarSplit := []byte{}
272+
if offsetTarSplit > 0 {
273+
tarSplit, err := readBlob(lengthTarSplit)
274+
if err != nil {
275+
return nil, nil, 0, err
276+
}
277+
278+
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, lengthUncompressedTarSplit, tarSplitChecksumAnnotation)
279+
if err != nil {
280+
return nil, nil, 0, err
281+
}
238282
}
283+
return decodedBlob, decodedTarSplit, int64(offset), err
284+
}
239285

240-
d, err := digest.Parse(manifestChecksumAnnotation)
286+
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedUncompressedChecksum string) ([]byte, error) {
287+
d, err := digest.Parse(expectedUncompressedChecksum)
241288
if err != nil {
242-
return nil, 0, err
289+
return nil, err
243290
}
244-
if manifestDigester.Digest() != d {
245-
return nil, 0, errors.New("invalid manifest checksum")
291+
292+
blobDigester := d.Algorithm().Digester()
293+
blobChecksum := blobDigester.Hash()
294+
if _, err := blobChecksum.Write(blob); err != nil {
295+
return nil, err
296+
}
297+
if blobDigester.Digest() != d {
298+
return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
246299
}
247300

248301
decoder, err := zstd.NewReader(nil) //nolint:contextcheck
249302
if err != nil {
250-
return nil, 0, err
303+
return nil, err
251304
}
252305
defer decoder.Close()
253306

254307
b := make([]byte, 0, lengthUncompressed)
255-
if decoded, err := decoder.DecodeAll(manifest, b); err == nil {
256-
return decoded, int64(offset), nil
257-
}
258-
259-
return manifest, int64(offset), nil
308+
return decoder.DecodeAll(blob, b)
260309
}

0 commit comments

Comments
 (0)