Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flashring/cmd/flashringtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func main() {
planRandomGaussian()
} else if plan == "readthrough-batched" {
planReadthroughGaussianBatched()
} else if plan == "lockless" {
planLockless()
} else {
panic("invalid plan")
}
Expand Down
228 changes: 228 additions & 0 deletions flashring/cmd/flashringtest/plan_lockless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package main

import (
"flag"
"fmt"
"math/rand"
"net/http"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

func planLockless() {
var (
mountPoint string
numShards int
keysPerShard int
memtableMB int
fileSizeMultiplier int
readWorkers int
writeWorkers int
sampleSecs int
iterations int64
aVal float64
logStats bool
memProfile string
cpuProfile string
)

flag.StringVar(&mountPoint, "mount", "/media/a0d00kc/trishul/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 500, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 10_00_00, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 16, "memtable size in MiB")
flag.IntVar(&fileSizeMultiplier, "file-size-multiplier", 2, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 8, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 8, "number of write workers")
flag.IntVar(&sampleSecs, "sample-secs", 30, "predictor sampling window in seconds")
flag.Int64Var(&iterations, "iterations", 100_000_000, "number of iterations")
flag.Float64Var(&aVal, "a", 0.4, "a value for the predictor")
flag.BoolVar(&logStats, "log-stats", true, "periodically log cache stats")
flag.StringVar(&memProfile, "memprofile", "mem.prof", "write memory profile to this file")
flag.StringVar(&cpuProfile, "cpuprofile", "", "write cpu profile to this file")
flag.Parse()

zerolog.SetGlobalLevel(zerolog.InfoLevel)
go func() {
log.Info().Msg("Starting pprof server on :8080")
log.Info().Msg("Access profiles at: http://localhost:8080/debug/pprof/")
log.Info().Msg("Memory profile: http://localhost:8080/debug/pprof/heap")
log.Info().Msg("Goroutine profile: http://localhost:8080/debug/pprof/goroutine")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Error().Err(err).Msg("pprof server failed")
}
}()

// CPU profiling
if cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
log.Fatal().Err(err).Msg("could not create CPU profile")
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal().Err(err).Msg("could not start CPU profile")
}
defer pprof.StopCPUProfile()
}

//remove all files inside the mount point
files, err := os.ReadDir(mountPoint)
if err != nil {
panic(err)
}
for _, file := range files {
os.Remove(filepath.Join(mountPoint, file.Name()))
}

memtableSizeInBytes := int32(memtableMB) * 1024 * 1024
fileSizeInBytes := int64(fileSizeMultiplier) * int64(memtableSizeInBytes)

cfg := cachepkg.WrapCacheConfig{
NumShards: numShards,
KeysPerShard: keysPerShard,
FileSize: fileSizeInBytes,
MemtableSize: memtableSizeInBytes,
ReWriteScoreThreshold: 0.8,
GridSearchEpsilon: 0.0001,
SampleDuration: time.Duration(sampleSecs) * time.Second,

// Pass the metrics collector to record cache metrics
MetricsRecorder: InitMetricsCollector(),
}

// Set additional input parameters that the cache doesn't know about
metricsCollector.SetShards(numShards)
metricsCollector.SetKeysPerShard(keysPerShard)
metricsCollector.SetReadWorkers(readWorkers)
metricsCollector.SetWriteWorkers(writeWorkers)
metricsCollector.SetPlan("lockless")

// Start background goroutine to wait for shutdown signal and export CSV
go RunmetricsWaitForShutdown()

pc, err := cachepkg.NewWrapCache(cfg, mountPoint, logStats)
if err != nil {
panic(err)
}

MULTIPLIER := 300

missedKeyChanList := make([]chan int, writeWorkers)
for i := 0; i < writeWorkers; i++ {
missedKeyChanList[i] = make(chan int)
}

totalKeys := keysPerShard * numShards
str1kb := strings.Repeat("a", 1024)
str1kb = "%d" + str1kb

var wg sync.WaitGroup
var writeWg sync.WaitGroup

//prepopulate 70% keys
fmt.Printf("----------------------------------------------prepopulating keys\n")
for k := 0; k < int(totalKeys); k++ {

if rand.Intn(100) < 30 {
continue
}

key := fmt.Sprintf("key%d", k)
val := []byte(fmt.Sprintf(str1kb, k))
if err := pc.PutLL(key, val, 60); err != nil {
panic(err)
}
if k%5000000 == 0 {
fmt.Printf("----------------------------------------------prepopulated %d keys\n", k)
}
}

if writeWorkers > 0 {
fmt.Printf("----------------------------------------------starting write workers\n")
writeWg.Add(writeWorkers)

for w := 0; w < writeWorkers; w++ {
go func(workerID int) {
defer writeWg.Done()

for mk := range missedKeyChanList[workerID] {
key := fmt.Sprintf("key%d", mk)
val := []byte(fmt.Sprintf(str1kb, mk))
if err := pc.PutLL(key, val, 60); err != nil {
panic(err)
}
}
}(w)
}
}

if readWorkers > 0 {
fmt.Printf("----------------------------------------------reading keys\n")
wg.Add(readWorkers)

for r := 0; r < readWorkers; r++ {
go func(workerID int) {
defer wg.Done()
for k := 0; k < totalKeys*MULTIPLIER; k += 1 {
randomval := normalDistIntPartitioned(workerID, readWorkers, totalKeys)
key := fmt.Sprintf("key%d", randomval)
val, found, expired := pc.GetLL(key)

if !found {
writeWorkerid := randomval % writeWorkers
missedKeyChanList[writeWorkerid] <- randomval
}

if expired {
panic("key expired")

}
if found && string(val) != fmt.Sprintf(str1kb, randomval) {
panic("value mismatch")
}
if k%5000000 == 0 {
fmt.Printf("----------------------------------------------read %d keys %d readerid\n", k, workerID)
}
}
}(r)
}
}

// Start pprof HTTP server for runtime profiling

wg.Wait()
log.Info().Msgf("done putting")

// Memory profiling
if memProfile != "" {
runtime.GC() // get up-to-date statistics
f, err := os.Create(memProfile)
if err != nil {
log.Fatal().Err(err).Msg("could not create memory profile")
}
defer f.Close()
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal().Err(err).Msg("could not write memory profile")
}
log.Info().Msgf("Memory profile written to %s", memProfile)
}

// Print memory stats
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info().
Str("alloc", fmt.Sprintf("%.2f MB", float64(m.Alloc)/1024/1024)).
Str("total_alloc", fmt.Sprintf("%.2f MB", float64(m.TotalAlloc)/1024/1024)).
Str("sys", fmt.Sprintf("%.2f MB", float64(m.Sys)/1024/1024)).
Uint32("num_gc", m.NumGC).
Msg("Memory statistics")
}
6 changes: 3 additions & 3 deletions flashring/cmd/flashringtest/plan_readthrough_gausian.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func planReadthroughGaussian() {
)

flag.StringVar(&mountPoint, "mount", "/media/a0d00kc/trishul/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 200, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 10_00_00, "keys per shard")
flag.IntVar(&numShards, "shards", 500, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 4_00_00, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 16, "memtable size in MiB")
flag.IntVar(&fileSizeMultiplier, "file-size-multiplier", 10, "file size in GiB per shard")
flag.IntVar(&fileSizeMultiplier, "file-size-multiplier", 2, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 8, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 8, "number of write workers")
flag.IntVar(&sampleSecs, "sample-secs", 30, "predictor sampling window in seconds")
Expand Down
75 changes: 72 additions & 3 deletions flashring/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
ErrMemtableSizeNotMultipleOf4KB = fmt.Errorf("memtable size must be a multiple of 4KB")
ErrFileSizeLessThan1 = fmt.Errorf("file size must be greater than 0")
ErrFileSizeNotMultipleOf4KB = fmt.Errorf("file size must be a multiple of 4KB")
Seed = strconv.Itoa(int(time.Now().UnixNano()))
Seed = xxhash.Sum64String(strconv.Itoa(int(time.Now().UnixNano())))
)

type WrapCache struct {
Expand Down Expand Up @@ -307,6 +307,76 @@ func NewWrapCache(config WrapCacheConfig, mountPoint string, logStats bool) (*Wr
return wc, nil
}

func (wc *WrapCache) PutLL(key string, value []byte, exptimeInMinutes uint16) error {

h32 := wc.Hash(key)
shardIdx := h32 % uint32(len(wc.shards))
start := time.Now()

result := filecache.ErrorPool.Get().(chan error)

wc.shards[shardIdx].WriteCh <- &filecache.WriteRequestV2{
Key: key,
Value: value,
ExptimeInMinutes: exptimeInMinutes,
Result: result,
}

if h32%100 < 10 {
wc.stats[shardIdx].ShardWiseActiveEntries.Store(uint64(wc.shards[shardIdx].GetRingBufferActiveEntries()))
}

op := <-result
filecache.ErrorPool.Put(result)
wc.stats[shardIdx].TotalPuts.Add(1)
wc.stats[shardIdx].LatencyTracker.RecordPut(time.Since(start))
return op
}

func (wc *WrapCache) GetLL(key string) ([]byte, bool, bool) {
h32 := wc.Hash(key)
shardIdx := h32 % uint32(len(wc.shards))

start := time.Now()

found, value, _, expired, needsSlowPath := wc.shards[shardIdx].GetFastPath(key)

if !needsSlowPath {
if found && !expired {
wc.stats[shardIdx].Hits.Add(1)
} else if expired {
wc.stats[shardIdx].Expired.Add(1)
}

wc.stats[shardIdx].TotalGets.Add(1)
wc.stats[shardIdx].LatencyTracker.RecordGet(time.Since(start))
return value, found, expired
}

result := filecache.ReadResultPool.Get().(chan filecache.ReadResultV2)

req := filecache.ReadRequestPool.Get().(*filecache.ReadRequestV2)
req.Key = key
req.Result = result

wc.shards[shardIdx].ReadCh <- req
op := <-result

filecache.ReadResultPool.Put(result)
filecache.ReadRequestPool.Put(req)

if op.Found && !op.Expired {
wc.stats[shardIdx].Hits.Add(1)
}
if op.Expired {
wc.stats[shardIdx].Expired.Add(1)
}
wc.stats[shardIdx].LatencyTracker.RecordGet(time.Since(start))
wc.stats[shardIdx].TotalGets.Add(1)

return op.Data, op.Found, op.Expired
}

func (wc *WrapCache) Put(key string, value []byte, exptimeInMinutes uint16) error {

h32 := wc.Hash(key)
Expand Down Expand Up @@ -376,8 +446,7 @@ func (wc *WrapCache) Get(key string) ([]byte, bool, bool) {
}

func (wc *WrapCache) Hash(key string) uint32 {
nKey := key + Seed
return uint32(xxhash.Sum64String(nKey))
return uint32(xxhash.Sum64String(key) ^ Seed)
}

func (wc *WrapCache) GetShardCache(shardIdx int) *filecache.ShardCache {
Expand Down
33 changes: 33 additions & 0 deletions flashring/internal/shard/batch_reader_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ type ReadResultV2 struct {
Error error
}

type WriteRequestV2 struct {
Key string
Value []byte
ExptimeInMinutes uint16
Result chan error
}

type BatchReaderV2 struct {
Requests chan *ReadRequestV2
batchWindow time.Duration
Expand All @@ -35,6 +42,32 @@ type BatchReaderV2Config struct {
MaxBatchSize int
}

var ReadRequestPool = sync.Pool{
New: func() interface{} {
return &ReadRequestV2{}
},
}

var ReadResultPool = sync.Pool{
New: func() interface{} {
return make(chan ReadResultV2, 1)
},
}

var ErrorPool = sync.Pool{
New: func() interface{} {
return make(chan error, 1)
},
}

var BufPool = sync.Pool{
New: func() interface{} {
// Allocate max expected size - use pointer to avoid allocation on Put
buf := make([]byte, 4096)
return &buf
},
}

func NewBatchReaderV2(config BatchReaderV2Config, sc *ShardCache, sl *sync.RWMutex) *BatchReaderV2 {
br := &BatchReaderV2{
Requests: make(chan *ReadRequestV2, config.MaxBatchSize*2),
Expand Down
Loading