Skip to content

Commit e032ee0

Browse files
authored
agent-events: add deduplicating web server (netdata#20014)
add deduplicating web server
1 parent c1277e9 commit e032ee0

5 files changed

Lines changed: 497 additions & 22 deletions

File tree

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
server
2+
go.mod
3+
go.sum
Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,26 @@
11
#!/bin/sh
22

3-
go build server.go
3+
# Exit immediately if a command exits with a non-zero status.
4+
set -e
5+
6+
# Optional: Clean slate - remove existing module files
7+
test -f "go.mod" && rm -f go.mod
8+
test -f "go.sum" && rm -f go.sum
9+
10+
# 1. Initialize the Go module
11+
echo "Initializing Go module..."
12+
go mod init server
13+
14+
# 2. Tidy dependencies
15+
echo "Tidying dependencies..."
16+
go mod tidy
17+
18+
# 3. Build the main server executable using '.' for current package
19+
echo "Building server executable..."
20+
go build -o ./server .
21+
22+
# 4. Run the unit tests
23+
echo "Running unit tests..."
24+
go test -v
25+
26+
echo "Build and test script finished."

packaging/tools/agent-events/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env bash
22

3-
stdbuf -oL /opt/agent-events/server --port=30001 2>/dev/null \
3+
stdbuf -oL /opt/agent-events/server --port=30001 --dedup-key agent.ephemeral_id --dedup-window 1800 2>/dev/null \
44
| stdbuf -oL log2journal json \
55
--prefix 'AE_' \
66
--inject 'SYSLOG_IDENTIFIER=agent-events' \
Lines changed: 210 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,247 @@
11
package main
22

33
import (
4+
"crypto/sha256"
5+
"encoding/json"
6+
"errors"
47
"flag"
58
"fmt"
69
"io"
710
"log"
811
"net/http"
912
"os"
1013
"strings"
14+
"sync"
15+
"time"
16+
17+
"github.com/tidwall/gjson"
18+
)
19+
20+
// --- Constants ---
21+
const (
22+
maxRequestBodySize = 20 * 1024 // 20 KiB
23+
)
24+
25+
// --- Custom Flag Type for Multi-use --dedup-key ---
26+
type dedupPaths []string
27+
28+
func (d *dedupPaths) String() string { return fmt.Sprintf("%v", *d) }
29+
func (d *dedupPaths) Set(value string) error {
30+
if value == "" {
31+
return fmt.Errorf("dedup-key path cannot be empty")
32+
}
33+
*d = append(*d, value)
34+
return nil
35+
}
36+
37+
// --- Global variables ---
38+
var (
39+
seenIDs map[[32]byte]seenEntry
40+
mapMutex = &sync.Mutex{}
41+
dedupWindow time.Duration
42+
debugMode bool
43+
keyPaths dedupPaths
44+
dedupSeparator string
1145
)
1246

47+
// --- Data Structures ---
48+
type seenEntry struct {
49+
timestamp time.Time
50+
}
51+
52+
// --- Core Logic Functions ---
53+
54+
// checkAndRecordHash accepts the SHA256 hash ([32]byte) for checking.
55+
// It now REFRESHES the timestamp whenever a hash is found,
56+
// effectively creating a sliding deduplication window.
57+
func checkAndRecordHash(hash [32]byte) bool {
58+
now := time.Now()
59+
mapMutex.Lock()
60+
defer mapMutex.Unlock()
61+
62+
var zeroHash [32]byte
63+
if hash == zeroHash {
64+
log.Println("Warning: checkAndRecordHash received potentially zero hash.")
65+
// Decide if zero hash should always be discarded, e.g. return false
66+
}
67+
68+
// Check if the hash exists in the map
69+
entry, found := seenIDs[hash]
70+
71+
if found {
72+
// --- Hash Found ---
73+
// Check if it was a duplicate based on the *previous* timestamp
74+
isRecentDuplicate := now.Sub(entry.timestamp) < dedupWindow
75+
76+
// *** Always update the timestamp to 'now' to refresh the window ***
77+
seenIDs[hash] = seenEntry{timestamp: now}
78+
79+
// Return 'false' if it was a recent duplicate (suppress processing),
80+
// return 'true' if it was found but expired (allow processing).
81+
return !isRecentDuplicate
82+
83+
} else {
84+
// --- Hash Not Found ---
85+
// Record the new hash with the current timestamp
86+
seenIDs[hash] = seenEntry{timestamp: now}
87+
// Return 'true' as this is the first time (or first time after expiry)
88+
return true
89+
}
90+
}
91+
92+
93+
// cleanupExpiredEntries uses the hash ([32]byte) as the key type.
94+
func cleanupExpiredEntries(interval time.Duration) {
95+
ticker := time.NewTicker(interval)
96+
defer ticker.Stop()
97+
cleanedCount := 0
98+
lastCleanupLogTime := time.Now()
99+
for range ticker.C {
100+
mapMutex.Lock()
101+
now := time.Now()
102+
for h, entry := range seenIDs {
103+
if now.Sub(entry.timestamp) >= dedupWindow {
104+
delete(seenIDs, h)
105+
cleanedCount++
106+
}
107+
}
108+
mapMutex.Unlock()
109+
// Simplified periodic logging for cleanup
110+
if cleanedCount > 0 && time.Since(lastCleanupLogTime) > time.Hour {
111+
if debugMode {
112+
log.Printf("Debug: Cleaned up %d expired entries in the past hour.", cleanedCount)
113+
}
114+
cleanedCount = 0 // Reset count after logging
115+
lastCleanupLogTime = time.Now()
116+
}
117+
}
118+
}
119+
120+
// --- HTTP Handler ---
13121
func handler(w http.ResponseWriter, r *http.Request) {
14122
if r.Method != http.MethodPost {
15123
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
16-
log.Printf("Method not allowed: %s\n", r.Method)
124+
log.Printf("Discarded: Method not allowed (%s) from %s", r.Method, r.RemoteAddr)
17125
return
18126
}
19-
20-
// Read the entire request body.
127+
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
21128
body, err := io.ReadAll(r.Body)
22129
if err != nil {
23-
http.Error(w, "Error reading request", http.StatusInternalServerError)
24-
log.Printf("Error reading request: %v\n", err)
130+
var maxBytesErr *http.MaxBytesError
131+
if errors.As(err, &maxBytesErr) {
132+
http.Error(w, fmt.Sprintf("Request body exceeds limit (%d bytes)", maxRequestBodySize), http.StatusRequestEntityTooLarge)
133+
log.Printf("Discarded: Request body too large (limit %d bytes) from %s", maxRequestBodySize, r.RemoteAddr)
134+
} else {
135+
http.Error(w, "Error reading request", http.StatusInternalServerError)
136+
log.Printf("Discarded: Error reading request body: %v", err)
137+
}
25138
return
26139
}
27-
defer r.Body.Close()
28140

29-
// Remove all newline characters.
30-
cleaned := strings.ReplaceAll(string(body), "\n", "")
31-
cleaned = strings.ReplaceAll(cleaned, "\r", "")
141+
shouldProcess := true
142+
if len(keyPaths) > 0 {
143+
var keyBuilder strings.Builder
144+
for i, path := range keyPaths {
145+
result := gjson.GetBytes(body, path)
146+
var valueStr string
147+
if result.Exists() { valueStr = result.String() } else { valueStr = "" }
148+
keyBuilder.WriteString(valueStr)
149+
if i < len(keyPaths)-1 { keyBuilder.WriteString(dedupSeparator) }
150+
}
151+
finalKeyString := keyBuilder.String()
152+
dedupHash := sha256.Sum256([]byte(finalKeyString))
153+
if debugMode {
154+
log.Printf("Debug: Generated dedup key string: \"%s\"", finalKeyString)
155+
log.Printf("Debug: Generated dedup hash: %x", dedupHash)
156+
}
32157

33-
// Write to stdout with a single newline at the end.
34-
fmt.Println(cleaned)
158+
// Call the updated checkAndRecordHash function
159+
if !checkAndRecordHash(dedupHash) {
160+
// It was determined to be a duplicate (based on previous timestamp)
161+
shouldProcess = false
162+
if debugMode { log.Printf("Debug: Discarded duplicate hash: %x (timestamp refreshed)", dedupHash) } // Updated log message
163+
// Respond OK for duplicate and stop processing
164+
if _, err := w.Write([]byte("OK")); err != nil { log.Printf("Error writing response after duplicate discard: %v", err) }
165+
return // Exit handler early for duplicates
166+
}
167+
// If we reach here, it was not a recent duplicate (new or expired)
168+
} else {
169+
if debugMode { log.Println("Debug: No --dedup-key flags provided, skipping deduplication.") }
170+
}
35171

36-
// Respond with OK.
37-
if _, err := w.Write([]byte("OK")); err != nil {
38-
log.Printf("Error writing response: %v\n", err)
39-
return
172+
if shouldProcess {
173+
var fullData interface{}
174+
if err := json.Unmarshal(body, &fullData); err != nil {
175+
http.Error(w, "Invalid JSON for full parsing", http.StatusBadRequest)
176+
bodyDetail := ""
177+
if debugMode { bodyDetail = fmt.Sprintf(", Body: %s", string(body)) } else { bodyDetail = fmt.Sprintf(", Body snippet: %s", limitString(string(body), 100)) }
178+
log.Printf("Discarded: Failed to fully parse JSON (post-dedup): %v%s", err, bodyDetail)
179+
return
180+
}
181+
outputBytes, err := json.Marshal(fullData)
182+
if err != nil {
183+
http.Error(w, "Internal Server Error during output marshal", http.StatusInternalServerError)
184+
log.Printf("Discarded: Failed to marshal JSON for output: %v", err)
185+
return
186+
}
187+
fmt.Println(string(outputBytes))
188+
if _, err := w.Write([]byte("OK")); err != nil { log.Printf("Error writing OK response: %v", err) }
40189
}
41190
}
42191

192+
// --- Main Function ---
43193
func main() {
44-
// Configure logging to write to stderr.
45194
log.SetOutput(os.Stderr)
46195
log.SetFlags(log.LstdFlags | log.Lshortfile)
47196

48-
// Parse the port from the command line.
197+
// --- Command Line Flags ---
49198
port := flag.Int("port", 8080, "Port to listen on")
199+
dedupSeconds := flag.Int("dedup-window", 1800, "Deduplication window in seconds (e.g., 1800 for 30 minutes)")
200+
flag.BoolVar(&debugMode, "debug", false, "Enable debug mode for verbose logging")
201+
flag.Var(&keyPaths, "dedup-key", "JSON path (dot-notation) for deduplication key (can be used multiple times)")
202+
flag.StringVar(&dedupSeparator, "dedup-separator", "-", "Separator used between values from multiple --dedup-key paths")
50203
flag.Parse()
51204

205+
seenIDs = make(map[[32]byte]seenEntry)
206+
dedupWindow = time.Duration(*dedupSeconds) * time.Second
207+
208+
if dedupWindow > 0 && len(keyPaths) > 0 {
209+
cleanupInterval := dedupWindow / 10
210+
if cleanupInterval < 1*time.Minute { cleanupInterval = 1 * time.Minute } else if cleanupInterval > 15*time.Minute { cleanupInterval = 15 * time.Minute }
211+
log.Printf("Cleanup goroutine started. Interval: %v", cleanupInterval)
212+
go cleanupExpiredEntries(cleanupInterval)
213+
} else if dedupWindow <= 0 && len(keyPaths) > 0 {
214+
log.Println("Warning: Deduplication keys provided, but window is zero or negative. Deduplication effectively disabled.")
215+
}
216+
217+
// --- Configure HTTP Server ---
218+
readTimeout := 10 * time.Second
219+
writeTimeout := 10 * time.Second
220+
idleTimeout := 60 * time.Second
221+
server := &http.Server{
222+
Addr: fmt.Sprintf(":%d", *port),
223+
Handler: http.DefaultServeMux,
224+
ReadTimeout: readTimeout,
225+
WriteTimeout: writeTimeout,
226+
IdleTimeout: idleTimeout,
227+
}
52228
http.HandleFunc("/", handler)
53-
log.Printf("Server listening on port %d\n", *port)
54-
if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
55-
log.Fatalf("Server failed: %v\n", err)
229+
230+
// --- Start Server ---
231+
log.Printf("Server listening on port %d", *port)
232+
log.Printf("Maximum request body size: %d bytes", maxRequestBodySize)
233+
if len(keyPaths) > 0 {
234+
log.Printf("Deduplication enabled: Keys=%v, Separator='%s', Window=%v (Sliding window: timestamp refreshed on duplicate)", keyPaths, dedupSeparator, dedupWindow) // Updated log message
235+
} else {
236+
log.Println("Deduplication disabled (no --dedup-key specified).")
56237
}
238+
log.Printf("Debug mode enabled: %t", debugMode)
239+
log.Printf("Server timeouts -> Read: %v, Write: %v, Idle: %v", readTimeout, writeTimeout, idleTimeout)
240+
log.Fatal(server.ListenAndServe())
241+
}
242+
243+
// --- Helper Functions ---
244+
func limitString(s string, maxLen int) string {
245+
if len(s) <= maxLen { return s }
246+
return s[:maxLen] + "..."
57247
}

0 commit comments

Comments
 (0)