Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions lib/logstorage/block_stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
Expand Down Expand Up @@ -95,10 +96,9 @@
return r.bloom.bytesRead + r.values.bytesRead
}

func (r *bloomValuesReader) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &r.bloom)
dst = append(dst, &r.values)
return dst
func (r *bloomValuesReader) appendCloserTasks(pe *fsutil.ParallelExecutor) {

Check failure on line 99 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fsutil.ParallelExecutor

Check failure on line 99 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&r.bloom))

Check failure on line 100 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 100 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
pe.Add(fs.NewCloserTask(&r.values))

Check failure on line 101 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 101 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
}

type bloomValuesStreamReader struct {
Expand Down Expand Up @@ -181,23 +181,22 @@
func (sr *streamReaders) MustClose() {
// Close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
&sr.columnNamesReader,
&sr.columnIdxsReader,
&sr.metaindexReader,
&sr.indexReader,
&sr.columnsHeaderIndexReader,
&sr.columnsHeaderReader,
&sr.timestampsReader,
}

cs = sr.messageBloomValuesReader.appendClosers(cs)
cs = sr.oldBloomValuesReader.appendClosers(cs)
var pe fsutil.ParallelExecutor

Check failure on line 184 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fsutil.ParallelExecutor

Check failure on line 184 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sr.columnNamesReader))

Check failure on line 185 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 185 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
pe.Add(fs.NewCloserTask(&sr.columnIdxsReader))

Check failure on line 186 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 186 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
pe.Add(fs.NewCloserTask(&sr.metaindexReader))

Check failure on line 187 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 187 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
pe.Add(fs.NewCloserTask(&sr.indexReader))

Check failure on line 188 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fs.NewCloserTask

Check failure on line 188 in lib/logstorage/block_stream_reader.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fs.NewCloserTask
pe.Add(fs.NewCloserTask(&sr.columnsHeaderIndexReader))
pe.Add(fs.NewCloserTask(&sr.columnsHeaderReader))
pe.Add(fs.NewCloserTask(&sr.timestampsReader))

sr.messageBloomValuesReader.appendCloserTasks(&pe)
sr.oldBloomValuesReader.appendCloserTasks(&pe)
for i := range sr.bloomValuesShards {
cs = sr.bloomValuesShards[i].appendClosers(cs)
cs = sr.bloomValuesShards[i].appendCloserTasks(&pe)
}

fs.MustCloseParallel(cs)
pe.Run()
}

func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {
Expand Down Expand Up @@ -355,63 +354,63 @@
// Open data readers in parallel in order to reduce the time for this operation
// on high-latency storage systems such as NFS or Ceph.

var pfo filestream.ParallelFileOpener
var pe fsutil.ParallelExecutor

var columnNamesReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pfo.Add(columnNamesPath, &columnNamesReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnNamesPath, &columnNamesReader, nocache))
}

var columnIdxsReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 3 {
pfo.Add(columnIdxsPath, &columnIdxsReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnIdxsPath, &columnIdxsReader, nocache))
}

var metaindexReader filestream.ReadCloser
pfo.Add(metaindexPath, &metaindexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(metaindexPath, &metaindexReader, nocache))

var indexReader filestream.ReadCloser
pfo.Add(indexPath, &indexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(indexPath, &indexReader, nocache))

var columnsHeaderIndexReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pfo.Add(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache))
}

var columnsHeaderReader filestream.ReadCloser
pfo.Add(columnsHeaderPath, &columnsHeaderReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnsHeaderPath, &columnsHeaderReader, nocache))

var timestampsReader filestream.ReadCloser
pfo.Add(timestampsPath, &timestampsReader, nocache)
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &timestampsReader, nocache))

messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesReader bloomValuesStreamReader
pfo.Add(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache)
pfo.Add(messageValuesPath, &messageBloomValuesReader.values, nocache)
pe.Add(filestream.NewFileOpenerTask(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache))
pe.Add(filestream.NewFileOpenerTask(messageValuesPath, &messageBloomValuesReader.values, nocache))

var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards []bloomValuesStreamReader
if bsr.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
pfo.Add(bloomPath, &oldBloomValuesReader.bloom, nocache)
pe.Add(filestream.NewFileOpenerTask(bloomPath, &oldBloomValuesReader.bloom, nocache))

valuesPath := filepath.Join(path, oldValuesFilename)
pfo.Add(valuesPath, &oldBloomValuesReader.values, nocache)
pe.Add(filestream.NewFileOpenerTask(valuesPath, &oldBloomValuesReader.values, nocache))
} else {
bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount)
for i := range bloomValuesShards {
shard := &bloomValuesShards[i]

bloomPath := getBloomFilePath(path, uint64(i))
pfo.Add(bloomPath, &shard.bloom, nocache)
pe.Add(filestream.NewFileOpenerTask(bloomPath, &shard.bloom, nocache))

valuesPath := getValuesFilePath(path, uint64(i))
pfo.Add(valuesPath, &shard.values, nocache)
pe.Add(filestream.NewFileOpenerTask(valuesPath, &shard.values, nocache))
}
}

pfo.Run()
pe.Run()

// Initialize streamReaders
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
Expand Down
55 changes: 27 additions & 28 deletions lib/logstorage/block_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
Expand Down Expand Up @@ -84,10 +85,9 @@
return w.bloom.bytesWritten + w.values.bytesWritten
}

func (w *bloomValuesWriter) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &w.bloom)
dst = append(dst, &w.values)
return dst
func (w *bloomValuesWriter) appendCloserTasks(pe *fsutil.ParallelExecutor) {

Check failure on line 88 in lib/logstorage/block_stream_writer.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fsutil.ParallelExecutor

Check failure on line 88 in lib/logstorage/block_stream_writer.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&w.bloom))
pe.Add(fs.NewCloserTask(&w.values))
}

type bloomValuesStreamWriter struct {
Expand Down Expand Up @@ -158,22 +158,21 @@
func (sw *streamWriters) MustClose() {
// Flush and close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
&sw.columnNamesWriter,
&sw.columnIdxsWriter,
&sw.metaindexWriter,
&sw.indexWriter,
&sw.columnsHeaderIndexWriter,
&sw.columnsHeaderWriter,
&sw.timestampsWriter,
}

cs = sw.messageBloomValuesWriter.appendClosers(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sw.columnNamesWriter))
pe.Add(fs.NewCloserTask(&sw.columnIdxsWriter))
pe.Add(fs.NewCloserTask(&sw.metaindexWriter))
pe.Add(fs.NewCloserTask(&sw.indexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderIndexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderWriter))
pe.Add(fs.NewCloserTask(&sw.timestampsWriter))

sw.messageBloomValuesWriter.appendCloserTasks(&pe)
for i := range sw.bloomValuesShards {
cs = sw.bloomValuesShards[i].appendClosers(cs)
sw.bloomValuesShards[i].appendClosers(&pe)
}

fs.MustCloseParallel(cs)
pe.Run()
}

func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
Expand Down Expand Up @@ -312,39 +311,39 @@
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)

var pfc filestream.ParallelFileCreator
var pe fsutil.ParallelExecutor

// Always cache columnNames file, since it is re-read immediately after part creation
var columnNamesWriter filestream.WriteCloser
pfc.Add(columnNamesPath, &columnNamesWriter, false)
pe.Add(filestream.NewFileCreatorTask(columnNamesPath, &columnNamesWriter, false))

// Always cache columnIdxs file, since it is re-read immediately after part creation
var columnIdxsWriter filestream.WriteCloser
pfc.Add(columnIdxsPath, &columnIdxsWriter, false)
pe.Add(filestream.NewFileCreatorTask(columnIdxsPath, &columnIdxsWriter, false))

// Always cache metaindex file, since it is re-read immediately after part creation
var metaindexWriter filestream.WriteCloser
pfc.Add(metaindexPath, &metaindexWriter, false)
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &metaindexWriter, false))

var indexWriter filestream.WriteCloser
pfc.Add(indexPath, &indexWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(indexPath, &indexWriter, nocache))

var columnsHeaderIndexWriter filestream.WriteCloser
pfc.Add(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache))

var columnsHeaderWriter filestream.WriteCloser
pfc.Add(columnsHeaderPath, &columnsHeaderWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(columnsHeaderPath, &columnsHeaderWriter, nocache))

var timestampsWriter filestream.WriteCloser
pfc.Add(timestampsPath, &timestampsWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &timestampsWriter, nocache))

messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesWriter bloomValuesStreamWriter
pfc.Add(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache)
pfc.Add(messageValuesPath, &messageBloomValuesWriter.values, nocache)
pe.Add(filestream.NewFileCreatorTask(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache))
pe.Add(filestream.NewFileCreatorTask(messageValuesPath, &messageBloomValuesWriter.values, nocache))

pfc.Run()
pe.Run()

createBloomValuesWriter := func(shardIdx uint64) bloomValuesStreamWriter {
bloomPath := getBloomFilePath(path, shardIdx)
Expand Down
27 changes: 14 additions & 13 deletions lib/logstorage/inmemory_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)

// inmemoryPart is an in-memory part.
Expand Down Expand Up @@ -120,26 +121,26 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)

var psw filestream.ParallelStreamWriter
var pe fsutil.ParallelExecutor

psw.Add(columnNamesPath, &mp.columnNames)
psw.Add(columnIdxsPath, &mp.columnIdxs)
psw.Add(metaindexPath, &mp.metaindex)
psw.Add(indexPath, &mp.index)
psw.Add(columnsHeaderIndexPath, &mp.columnsHeaderIndex)
psw.Add(columnsHeaderPath, &mp.columnsHeader)
psw.Add(timestampsPath, &mp.timestamps)
pe.Add(filestream.NewStreamWriterTask(columnNamesPath, &mp.columnNames))
pe.Add(filestream.NewStreamWriterTask(columnIdxsPath, &mp.columnIdxs))
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindex))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.index))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderIndexPath, &mp.columnsHeaderIndex))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderPath, &mp.columnsHeader))
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestamps))

psw.Add(messageBloomFilterPath, &mp.messageBloomValues.bloom)
psw.Add(messageValuesPath, &mp.messageBloomValues.values)
pe.Add(filestream.NewStreamWriterTask(messageBloomFilterPath, &mp.messageBloomValues.bloom))
pe.Add(filestream.NewStreamWriterTask(messageValuesPath, &mp.messageBloomValues.values))

bloomPath := getBloomFilePath(path, 0)
psw.Add(bloomPath, &mp.fieldBloomValues.bloom)
pe.Add(filestream.NewStreamWriterTask(bloomPath, &mp.fieldBloomValues.bloom))

valuesPath := getValuesFilePath(path, 0)
psw.Add(valuesPath, &mp.fieldBloomValues.values)
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.fieldBloomValues.values))

psw.Run()
pe.Run()

mp.ph.mustWriteMetadata(path)

Expand Down
46 changes: 23 additions & 23 deletions lib/logstorage/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

Expand Down Expand Up @@ -54,10 +55,9 @@
values fs.MustReadAtCloser
}

func (r *bloomValuesReaderAt) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, r.bloom)
dst = append(dst, r.values)
return dst
func (r *bloomValuesReaderAt) appendCloserTasks(pe *fsutil.ParallelExecutor) {

Check failure on line 58 in lib/logstorage/part.go

View workflow job for this annotation

GitHub Actions / Build

undefined: fsutil.ParallelExecutor

Check failure on line 58 in lib/logstorage/part.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(r.bloom))
pe.Add(fs.NewCloserTask(r.values))
}

func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
Expand Down Expand Up @@ -137,36 +137,36 @@
mrs.MustClose()

// Open data files
p.indexFile = fs.MustOpenReaderAt(indexPath)
p.indexFile = fs.OpenReaderAt(indexPath)
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
p.columnsHeaderIndexFile = fs.OpenReaderAt(columnsHeaderIndexPath)
}
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
p.columnsHeaderFile = fs.OpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.OpenReaderAt(timestampsPath)

// Open files with bloom filters and column values
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
p.messageBloomValues.bloom = fs.OpenReaderAt(messageBloomFilterPath)

messageValuesPath := filepath.Join(path, messageValuesFilename)
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
p.messageBloomValues.values = fs.OpenReaderAt(messageValuesPath)

if p.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
p.oldBloomValues.bloom = fs.OpenReaderAt(bloomPath)

valuesPath := filepath.Join(path, oldValuesFilename)
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
p.oldBloomValues.values = fs.OpenReaderAt(valuesPath)
} else {
p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount)
for i := range p.bloomValuesShards {
shard := &p.bloomValuesShards[i]

bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = fs.MustOpenReaderAt(bloomPath)
shard.bloom = fs.OpenReaderAt(bloomPath)

valuesPath := getValuesFilePath(path, uint64(i))
shard.values = fs.MustOpenReaderAt(valuesPath)
shard.values = fs.OpenReaderAt(valuesPath)
}
}

Expand All @@ -176,25 +176,25 @@
func mustClosePart(p *part) {
// Close files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS and Ceph.
var cs []fs.MustCloser
var pe fsutil.ParallelExecutor

cs = append(cs, p.indexFile)
pe.Add(fs.NewCloserTask(p.indexFile))
if p.ph.FormatVersion >= 1 {
cs = append(cs, p.columnsHeaderIndexFile)
pe.Add(fs.NewCloserTask(p.columnsHeaderIndexFile))
}
cs = append(cs, p.columnsHeaderFile)
cs = append(cs, p.timestampsFile)
cs = p.messageBloomValues.appendClosers(cs)
pe.Add(fs.NewCloserTask(p.columnsHeaderFile))
pe.Add(fs.NewCloserTask(p.timestampsFile))
p.messageBloomValues.appendCloserTasks(&pe)

if p.ph.FormatVersion < 1 {
cs = p.oldBloomValues.appendClosers(cs)
p.oldBloomValues.appendCloserTasks(&pe)
} else {
for i := range p.bloomValuesShards {
cs = p.bloomValuesShards[i].appendClosers(cs)
p.bloomValuesShards[i].appendCloserTasks(&pe)
}
}

fs.MustCloseParallel(cs)
pe.Run()

p.pt = nil
}
Expand Down
Loading