Skip to content

Commit 6a40deb

Browse files
Merge pull request #353 from Meesho/async-data-logger-metrics
metric updates and GCS uploader changes
1 parent 68cc6ef commit 6a40deb

14 files changed

Lines changed: 378 additions & 677 deletions

asyncloguploader/config.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ type Config struct {
2121
FlushTimeout time.Duration // Wait for write completion before flush (default: 10ms)
2222

2323
// Upload configuration
24-
UploadChannel chan<- string // Optional: channel for completed files
25-
GCSUploadConfig *GCSUploadConfig // Optional: GCS upload configuration
24+
GCSUploadConfig *GCSUploadConfig // Optional: GCS upload configuration (uploader scans log dir)
25+
26+
// MetricTags are application-provided tags propagated to all metric emissions
27+
// (e.g., from metric.BuildTag(metric.NewTag("service", "x"), ...))
28+
MetricTags []string
2629
}
2730

2831
// GCSUploadConfig holds configuration for GCS uploader
@@ -34,7 +37,7 @@ type GCSUploadConfig struct {
3437
MaxRetries int // Max retry attempts (default: 3)
3538
RetryDelay time.Duration // Delay between retries (default: 5s)
3639
GRPCPoolSize int // gRPC connection pool size (default: 64)
37-
ChannelBufferSize int // Upload channel buffer size (default: 100)
40+
ScanInterval time.Duration // How often to scan log dir for .log files (default: 10s)
3841
}
3942

4043
// DefaultConfig returns a configuration with baseline defaults
@@ -47,7 +50,6 @@ func DefaultConfig(logPath string) Config {
4750
PreallocateFileSize: 0, // Disabled by default
4851
FlushInterval: 10 * time.Second,
4952
FlushTimeout: 10 * time.Millisecond,
50-
UploadChannel: nil, // Optional
5153
GCSUploadConfig: nil, // Optional
5254
}
5355
}
@@ -62,7 +64,7 @@ func DefaultGCSUploadConfig(bucket string) GCSUploadConfig {
6264
MaxRetries: 3,
6365
RetryDelay: 5 * time.Second,
6466
GRPCPoolSize: 64,
65-
ChannelBufferSize: 100,
67+
ScanInterval: 10 * time.Second,
6668
}
6769
}
6870

@@ -130,8 +132,8 @@ func (g *GCSUploadConfig) Validate() error {
130132
g.GRPCPoolSize = 64
131133
}
132134

133-
if g.ChannelBufferSize <= 0 {
134-
g.ChannelBufferSize = 100
135+
if g.ScanInterval <= 0 {
136+
g.ScanInterval = 10 * time.Second
135137
}
136138

137139
return nil

asyncloguploader/file_rename.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package asyncloguploader
2+
3+
import (
4+
"os"
5+
"strings"
6+
"time"
7+
8+
"github.com/Meesho/go-core/metric"
9+
)
10+
11+
const renameRetryCount = 3
12+
const renameRetryDelay = 10 * time.Millisecond
13+
14+
// renameTmpToLog renames a .log.tmp file to .log with retries.
15+
// On failure after retries, emits MetricFileRenameFailed and returns nil (ignore per spec).
16+
func renameTmpToLog(tmpPath string, metricTags []string) {
17+
logPath := strings.TrimSuffix(tmpPath, ".tmp")
18+
if logPath == tmpPath {
19+
return // not a .tmp file, nothing to do
20+
}
21+
for i := 0; i < renameRetryCount; i++ {
22+
if err := os.Rename(tmpPath, logPath); err == nil {
23+
return
24+
}
25+
time.Sleep(renameRetryDelay)
26+
}
27+
metric.Incr(MetricFileRenameFailed, getBaseTags(metricTags))
28+
}

asyncloguploader/file_writer_default.go

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/Meesho/go-core/metric"
1415
logger "github.com/rs/zerolog/log"
1516
)
1617

@@ -39,21 +40,23 @@ type SizeFileWriter struct {
3940
// Last write duration (for metrics tracking)
4041
lastPwritevDuration atomic.Int64 // Nanoseconds
4142

42-
// Channel for completed files (for GCS upload)
43-
completedFileChan chan<- string
43+
// Metric tags for emissions
44+
metricTags []string
4445
}
4546

46-
// NewSizeFileWriter creates a new SizeFileWriter (non-Linux fallback)
47-
func NewSizeFileWriter(config Config, completedFileChan chan<- string) (*SizeFileWriter, error) {
47+
// NewSizeFileWriter creates a new SizeFileWriter (non-Linux fallback).
48+
// Files are written with .log.tmp suffix during active write; renamed to .log on rotation/close.
49+
func NewSizeFileWriter(config Config, metricTags []string) (*SizeFileWriter, error) {
4850
// Extract base directory and filename
4951
baseDir, baseFileName, err := extractBasePathSize(config.LogFilePath)
5052
if err != nil {
5153
return nil, fmt.Errorf("failed to extract base path: %w", err)
5254
}
5355

5456
// Generate timestamped filename for initial file
57+
// Use .log.tmp during active write; rename to .log on rotation/close
5558
timestamp := time.Now().Format("2006-01-02_15-04-05")
56-
initialPath := filepath.Join(baseDir, fmt.Sprintf("%s_%s.log", baseFileName, timestamp))
59+
initialPath := filepath.Join(baseDir, fmt.Sprintf("%s_%s.log.tmp", baseFileName, timestamp))
5760

5861
// Open initial file (always starts at offset 0 for new files)
5962
file, err := openDirectIOSize(initialPath, config.PreallocateFileSize)
@@ -69,7 +72,7 @@ func NewSizeFileWriter(config Config, completedFileChan chan<- string) (*SizeFil
6972
baseDir: baseDir,
7073
baseFileName: baseFileName,
7174
preallocateFileSize: config.PreallocateFileSize,
72-
completedFileChan: completedFileChan,
75+
metricTags: getBaseTags(metricTags),
7376
}
7477

7578
// New files always start at offset 0
@@ -93,7 +96,6 @@ func (fw *SizeFileWriter) WriteVectored(buffers [][]byte) (int, error) {
9396
// Get current offset
9497
offset := fw.fileOffset.Load()
9598

96-
// Write sequentially (non-Linux fallback)
9799
writeStart := time.Now()
98100
totalWritten := 0
99101
for _, buf := range buffers {
@@ -103,14 +105,17 @@ func (fw *SizeFileWriter) WriteVectored(buffers [][]byte) (int, error) {
103105
n, err := fw.file.WriteAt(buf, offset+int64(totalWritten))
104106
if err != nil {
105107
logger.Error().Err(err).Msgf("write failed (file=%s offset=%d)", fw.filePath, offset+int64(totalWritten))
106-
fw.lastPwritevDuration.Store(time.Since(writeStart).Nanoseconds())
108+
writeDuration := time.Since(writeStart)
109+
fw.lastPwritevDuration.Store(writeDuration.Nanoseconds())
110+
metric.Timing(MetricFileWriterWriteDuration, writeDuration, fw.metricTags)
107111
return totalWritten, err
108112
}
109113
totalWritten += n
110114
}
111115
writeDuration := time.Since(writeStart)
112116

113117
fw.lastPwritevDuration.Store(writeDuration.Nanoseconds())
118+
metric.Timing(MetricFileWriterWriteDuration, writeDuration, fw.metricTags)
114119
fw.fileOffset.Add(int64(totalWritten))
115120

116121
return totalWritten, nil
@@ -129,7 +134,6 @@ func (fw *SizeFileWriter) Close() error {
129134
// We need to complete the rotation: swap files, then close both
130135
if fw.nextFile != nil && fw.file != nil {
131136
// Complete the rotation by swapping files
132-
// This will send the current file to upload channel
133137
if err := fw.swapFiles(); err != nil && firstErr == nil {
134138
firstErr = fmt.Errorf("failed to complete rotation during close: %w", err)
135139
}
@@ -167,16 +171,9 @@ func (fw *SizeFileWriter) Close() error {
167171
firstErr = err
168172
}
169173

170-
// Send completed file to upload channel (non-blocking) if it has data
171-
if hasData && fw.completedFileChan != nil {
172-
select {
173-
case fw.completedFileChan <- completedFilePath:
174-
// Successfully sent to channel
175-
default:
176-
// Channel full - log warning but don't block close
177-
logger.Warn().Msgf("upload channel full, skipping upload (file=%s)", completedFilePath)
178-
fmt.Printf("[WARNING] Upload channel full, skipping upload for %s\n", completedFilePath)
179-
}
174+
// Rename .tmp to .log (retry 3x, emit metric on failure)
175+
if hasData {
176+
renameTmpToLog(completedFilePath, fw.metricTags)
180177
}
181178

182179
fw.file = nil
@@ -234,7 +231,7 @@ func (fw *SizeFileWriter) rotateIfNeeded() error {
234231
// createNextFile creates a new file for rotation
235232
func (fw *SizeFileWriter) createNextFile() error {
236233
timestamp := time.Now().Format("2006-01-02_15-04-05")
237-
nextPath := filepath.Join(fw.baseDir, fmt.Sprintf("%s_%s.log", fw.baseFileName, timestamp))
234+
nextPath := filepath.Join(fw.baseDir, fmt.Sprintf("%s_%s.log.tmp", fw.baseFileName, timestamp))
238235

239236
file, err := openDirectIOSize(nextPath, fw.preallocateFileSize)
240237
if err != nil {
@@ -281,27 +278,21 @@ func (fw *SizeFileWriter) swapFiles() error {
281278
return fmt.Errorf("failed to close current file: %w", err)
282279
}
283280

284-
// Send completed file to upload channel (non-blocking)
285-
if fw.completedFileChan != nil {
286-
select {
287-
case fw.completedFileChan <- completedFilePath:
288-
default:
289-
logger.Warn().Msgf("upload channel full, skipping upload (file=%s)", completedFilePath)
290-
fmt.Printf("[WARNING] Upload channel full, skipping upload for %s\n", completedFilePath)
291-
}
292-
}
281+
// Rename .tmp to .log (retry 3x, emit metric on failure)
282+
renameTmpToLog(completedFilePath, fw.metricTags)
293283

294284
// Swap next file to current
295285
fw.file = fw.nextFile
296286
fw.fd = fw.nextFd
297287
fw.filePath = fw.nextFilePath
298288
fw.fileOffset.Store(0)
299289

300-
// Clear next file fields
301290
fw.nextFile = nil
302291
fw.nextFd = 0
303292
fw.nextFilePath = ""
304293

294+
metric.Incr(MetricFileWriterRotationCount, fw.metricTags)
295+
305296
return nil
306297
}
307298

asyncloguploader/file_writer_linux.go

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/Meesho/go-core/metric"
1415
logger "github.com/rs/zerolog/log"
1516
"golang.org/x/sys/unix"
1617
)
@@ -46,22 +47,24 @@ type SizeFileWriter struct {
4647
// Last Pwritev duration (for metrics tracking)
4748
lastPwritevDuration atomic.Int64 // Nanoseconds
4849

49-
// Channel for completed files (for GCS upload)
50-
completedFileChan chan<- string
50+
// Metric tags for emissions (e.g., from Config.MetricTags + event_name)
51+
metricTags []string
5152
}
5253

53-
// NewSizeFileWriter creates a new SizeFileWriter with the given configuration
54-
// completedFileChan is optional - if provided, completed files will be sent to this channel for upload
55-
func NewSizeFileWriter(config Config, completedFileChan chan<- string) (*SizeFileWriter, error) {
54+
// NewSizeFileWriter creates a new SizeFileWriter with the given configuration.
55+
// Files are written with .log.tmp suffix during active write; renamed to .log on rotation/close.
56+
// metricTags are propagated to metric emissions (e.g., from MergeMetricTags(config.MetricTags, eventName))
57+
func NewSizeFileWriter(config Config, metricTags []string) (*SizeFileWriter, error) {
5658
// Extract base directory and filename
5759
baseDir, baseFileName, err := extractBasePathSize(config.LogFilePath)
5860
if err != nil {
5961
return nil, fmt.Errorf("failed to extract base path: %w", err)
6062
}
6163

6264
// Generate timestamped filename for initial file (consistent naming)
65+
// Use .log.tmp during active write; rename to .log on rotation/close
6366
timestamp := time.Now().Format("2006-01-02_15-04-05")
64-
initialPath := filepath.Join(baseDir, fmt.Sprintf("%s_%s.log", baseFileName, timestamp))
67+
initialPath := filepath.Join(baseDir, fmt.Sprintf("%s_%s.log.tmp", baseFileName, timestamp))
6568

6669
// Open initial file with preallocation (always starts at offset 0 for new files)
6770
file, err := openDirectIOSize(initialPath, config.PreallocateFileSize)
@@ -77,7 +80,7 @@ func NewSizeFileWriter(config Config, completedFileChan chan<- string) (*SizeFil
7780
baseDir: baseDir,
7881
baseFileName: baseFileName,
7982
preallocateFileSize: config.PreallocateFileSize,
80-
completedFileChan: completedFileChan,
83+
metricTags: getBaseTags(metricTags),
8184
}
8285

8386
// New files always start at offset 0
@@ -108,15 +111,14 @@ func (fw *SizeFileWriter) WriteVectored(buffers [][]byte) (int, error) {
108111
n, err := writevAlignedWithOffset(fw.fd, buffers, offset)
109112
pwritevDuration := time.Since(pwritevStart)
110113

111-
// Store write duration for metrics
112114
fw.lastPwritevDuration.Store(pwritevDuration.Nanoseconds())
115+
metric.Timing(MetricFileWriterWriteDuration, pwritevDuration, fw.metricTags)
113116

114117
if err != nil {
115118
logger.Error().Err(err).Msgf("writev failed (file=%s offset=%d)", fw.filePath, offset)
116119
return n, err
117120
}
118121

119-
// Update offset atomically after successful write
120122
fw.fileOffset.Add(int64(n))
121123

122124
return n, nil
@@ -135,7 +137,6 @@ func (fw *SizeFileWriter) Close() error {
135137
// We need to complete the rotation: swap files, then close both
136138
if fw.nextFile != nil && fw.file != nil {
137139
// Complete the rotation by swapping files
138-
// This will send the current file to upload channel
139140
if err := fw.swapFiles(); err != nil && firstErr == nil {
140141
firstErr = fmt.Errorf("failed to complete rotation during close: %w", err)
141142
}
@@ -173,16 +174,9 @@ func (fw *SizeFileWriter) Close() error {
173174
firstErr = err
174175
}
175176

176-
// Send completed file to upload channel (non-blocking) if it has data
177-
if hasData && fw.completedFileChan != nil {
178-
select {
179-
case fw.completedFileChan <- completedFilePath:
180-
// Successfully sent to channel
181-
default:
182-
// Channel full - log warning but don't block close
183-
logger.Warn().Msgf("upload channel full, skipping upload (file=%s)", completedFilePath)
184-
fmt.Printf("[WARNING] Upload channel full, skipping upload for %s\n", completedFilePath)
185-
}
177+
// Rename .tmp to .log (retry 3x, emit metric on failure)
178+
if hasData {
179+
renameTmpToLog(completedFilePath, fw.metricTags)
186180
}
187181

188182
fw.file = nil
@@ -249,9 +243,9 @@ func (fw *SizeFileWriter) rotateIfNeeded() error {
249243

250244
// createNextFile creates a new file for rotation with preallocation
251245
func (fw *SizeFileWriter) createNextFile() error {
252-
// Generate timestamped filename: {baseFileName}_{YYYY-MM-DD_HH-MM-SS}.log
246+
// Generate timestamped filename: {baseFileName}_{YYYY-MM-DD_HH-MM-SS}.log.tmp
253247
timestamp := time.Now().Format("2006-01-02_15-04-05")
254-
nextPath := filepath.Join(fw.baseDir, fmt.Sprintf("%s_%s.log", fw.baseFileName, timestamp))
248+
nextPath := filepath.Join(fw.baseDir, fmt.Sprintf("%s_%s.log.tmp", fw.baseFileName, timestamp))
255249

256250
// Try to open new file with preallocation
257251
file, err := openDirectIOSize(nextPath, fw.preallocateFileSize)
@@ -302,37 +296,29 @@ func (fw *SizeFileWriter) swapFiles() error {
302296
}
303297
}
304298

305-
// Store current file path before closing (for upload)
299+
// Store current file path before closing
306300
completedFilePath := fw.filePath
307301

308302
// Close current file
309303
if err := fw.file.Close(); err != nil {
310304
return fmt.Errorf("failed to close current file: %w", err)
311305
}
312306

313-
// Send completed file to upload channel (non-blocking)
314-
if fw.completedFileChan != nil {
315-
select {
316-
case fw.completedFileChan <- completedFilePath:
317-
// Successfully sent to channel
318-
default:
319-
// Channel full - log warning but don't block rotation
320-
logger.Warn().Msgf("upload channel full, skipping upload (file=%s)", completedFilePath)
321-
fmt.Printf("[WARNING] Upload channel full, skipping upload for %s\n", completedFilePath)
322-
}
323-
}
307+
// Rename .tmp to .log (retry 3x, emit metric on failure)
308+
renameTmpToLog(completedFilePath, fw.metricTags)
324309

325310
// Swap next file to current
326311
fw.file = fw.nextFile
327312
fw.fd = fw.nextFd
328313
fw.filePath = fw.nextFilePath
329314
fw.fileOffset.Store(0) // Reset offset for new file
330315

331-
// Clear next file fields
332316
fw.nextFile = nil
333317
fw.nextFd = 0
334318
fw.nextFilePath = ""
335319

320+
metric.Incr(MetricFileWriterRotationCount, fw.metricTags)
321+
336322
return nil
337323
}
338324

0 commit comments

Comments
 (0)