Skip to content

Commit f63decf

Browse files
committed
Add WAL checkpoint manager and related tests for improved data integrity
This commit introduces a new WALCheckpointManager in the core package, which manages periodic checkpoints of the Write-Ahead Log (WAL) to enhance database integrity. The manager is designed to register active buckets, execute checkpoints, and handle bucket scanning. Additionally, comprehensive tests are added to validate the functionality of the WAL checkpoint manager, ensuring reliable performance in the VFS. This enhancement contributes to better resource management and data consistency.
1 parent 38b56a9 commit f63decf

3 files changed

Lines changed: 430 additions & 1 deletion

File tree

core/wal_checkpoint_manager.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package core
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
"path/filepath"
8+
"sync"
9+
"time"
10+
)
11+
12+
// WALCheckpointManager 定期刷新 WAL 的管理器
13+
type WALCheckpointManager struct {
14+
dataPath string
15+
checkpointInterval time.Duration
16+
stopCh chan struct{}
17+
wg sync.WaitGroup
18+
mu sync.RWMutex
19+
activeBuckets map[int64]time.Time // bktID -> last checkpoint time
20+
}
21+
22+
// WALCheckpointConfig WAL checkpoint 配置
23+
type WALCheckpointConfig struct {
24+
CheckpointInterval time.Duration // checkpoint 间隔
25+
}
26+
27+
// DefaultWALCheckpointConfig 返回默认配置
28+
func DefaultWALCheckpointConfig() *WALCheckpointConfig {
29+
return &WALCheckpointConfig{
30+
CheckpointInterval: 10 * time.Second, // 每10秒刷新一次
31+
}
32+
}
33+
34+
// NewWALCheckpointManager 创建 WAL checkpoint 管理器
35+
func NewWALCheckpointManager(dataPath string, config *WALCheckpointConfig) *WALCheckpointManager {
36+
if config == nil {
37+
config = DefaultWALCheckpointConfig()
38+
}
39+
40+
return &WALCheckpointManager{
41+
dataPath: dataPath,
42+
checkpointInterval: config.CheckpointInterval,
43+
stopCh: make(chan struct{}),
44+
activeBuckets: make(map[int64]time.Time),
45+
}
46+
}
47+
48+
// Start 启动管理器
49+
func (wcm *WALCheckpointManager) Start() error {
50+
if wcm.checkpointInterval == 0 {
51+
log.Printf("[WAL Checkpoint Manager] Checkpoint disabled (interval=0)")
52+
return nil
53+
}
54+
55+
wcm.wg.Add(1)
56+
go wcm.checkpointLoop()
57+
58+
log.Printf("[WAL Checkpoint Manager] Started: interval=%v, dataPath=%s",
59+
wcm.checkpointInterval, wcm.dataPath)
60+
return nil
61+
}
62+
63+
// Stop 停止管理器
64+
func (wcm *WALCheckpointManager) Stop() {
65+
close(wcm.stopCh)
66+
wcm.wg.Wait()
67+
log.Printf("[WAL Checkpoint Manager] Stopped")
68+
}
69+
70+
// RegisterBucket 注册一个活跃的桶(有写入操作)
71+
func (wcm *WALCheckpointManager) RegisterBucket(bktID int64) {
72+
wcm.mu.Lock()
73+
defer wcm.mu.Unlock()
74+
wcm.activeBuckets[bktID] = time.Now()
75+
}
76+
77+
// checkpointLoop 定期执行 checkpoint
78+
func (wcm *WALCheckpointManager) checkpointLoop() {
79+
defer wcm.wg.Done()
80+
81+
ticker := time.NewTicker(wcm.checkpointInterval)
82+
defer ticker.Stop()
83+
84+
for {
85+
select {
86+
case <-wcm.stopCh:
87+
return
88+
case <-ticker.C:
89+
wcm.runCheckpoint()
90+
}
91+
}
92+
}
93+
94+
// runCheckpoint 执行一次 checkpoint
95+
func (wcm *WALCheckpointManager) runCheckpoint() {
96+
// 获取活跃桶列表
97+
wcm.mu.RLock()
98+
buckets := make([]int64, 0, len(wcm.activeBuckets))
99+
for bktID := range wcm.activeBuckets {
100+
buckets = append(buckets, bktID)
101+
}
102+
wcm.mu.RUnlock()
103+
104+
if len(buckets) == 0 {
105+
// 没有活跃桶,扫描 dataPath 下的所有桶
106+
buckets = wcm.scanBuckets()
107+
}
108+
109+
// 对每个桶执行 checkpoint
110+
successCount := 0
111+
errorCount := 0
112+
113+
for _, bktID := range buckets {
114+
if err := wcm.checkpointBucket(bktID); err != nil {
115+
log.Printf("[WAL Checkpoint Manager] ERROR: Failed to checkpoint bucket %d: %v", bktID, err)
116+
errorCount++
117+
} else {
118+
successCount++
119+
}
120+
}
121+
122+
if successCount > 0 || errorCount > 0 {
123+
log.Printf("[WAL Checkpoint Manager] Checkpoint completed: success=%d, error=%d, total=%d",
124+
successCount, errorCount, len(buckets))
125+
}
126+
}
127+
128+
// scanBuckets 扫描 dataPath 下的所有桶目录
129+
func (wcm *WALCheckpointManager) scanBuckets() []int64 {
130+
buckets := make([]int64, 0)
131+
132+
// 读取 dataPath 目录
133+
entries, err := os.ReadDir(wcm.dataPath)
134+
if err != nil {
135+
log.Printf("[WAL Checkpoint Manager] ERROR: Failed to read dataPath %s: %v", wcm.dataPath, err)
136+
return buckets
137+
}
138+
139+
// 查找所有数字命名的目录(桶ID)
140+
for _, entry := range entries {
141+
if !entry.IsDir() {
142+
continue
143+
}
144+
145+
var bktID int64
146+
if _, err := fmt.Sscanf(entry.Name(), "%d", &bktID); err == nil {
147+
buckets = append(buckets, bktID)
148+
}
149+
}
150+
151+
return buckets
152+
}
153+
154+
// checkpointBucket 对指定桶执行 checkpoint
155+
func (wcm *WALCheckpointManager) checkpointBucket(bktID int64) error {
156+
// 构造桶数据库路径
157+
bktDirPath := filepath.Join(wcm.dataPath, fmt.Sprint(bktID))
158+
dbPath := filepath.Join(bktDirPath, ".db")
159+
160+
// 检查数据库文件是否存在
161+
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
162+
// 数据库不存在,跳过
163+
return nil
164+
}
165+
166+
// 检查 WAL 文件是否存在
167+
walPath := dbPath + "-wal"
168+
walInfo, err := os.Stat(walPath)
169+
if os.IsNotExist(err) {
170+
// WAL 文件不存在,跳过
171+
return nil
172+
}
173+
if err != nil {
174+
return fmt.Errorf("failed to stat WAL file: %w", err)
175+
}
176+
177+
// 如果 WAL 文件为空或很小,跳过
178+
if walInfo.Size() < 1024 {
179+
return nil
180+
}
181+
182+
// 获取数据库连接
183+
db, err := GetWriteDB(bktDirPath, "")
184+
if err != nil {
185+
return fmt.Errorf("failed to get database connection: %w", err)
186+
}
187+
// 注意:不要关闭连接,它来自连接池
188+
189+
// 执行 WAL checkpoint (TRUNCATE 模式)
190+
// TRUNCATE 模式会将 WAL 内容写入主数据库文件,并截断 WAL 文件
191+
var busy, logPages, checkpointed int
192+
err = db.QueryRow("PRAGMA wal_checkpoint(TRUNCATE)").Scan(&busy, &logPages, &checkpointed)
193+
if err != nil {
194+
return fmt.Errorf("failed to execute checkpoint: %w", err)
195+
}
196+
197+
// 记录 checkpoint 结果
198+
if checkpointed > 0 {
199+
log.Printf("[WAL Checkpoint Manager] Checkpointed bucket %d: busy=%d, logPages=%d, checkpointed=%d, walSize=%d",
200+
bktID, busy, logPages, checkpointed, walInfo.Size())
201+
}
202+
203+
return nil
204+
}
205+
206+
// GetStats 获取统计信息
207+
func (wcm *WALCheckpointManager) GetStats() map[string]interface{} {
208+
wcm.mu.RLock()
209+
defer wcm.mu.RUnlock()
210+
211+
return map[string]interface{}{
212+
"dataPath": wcm.dataPath,
213+
"checkpointInterval": wcm.checkpointInterval.String(),
214+
"activeBuckets": len(wcm.activeBuckets),
215+
}
216+
}
217+

0 commit comments

Comments
 (0)