Skip to content

Commit 95e380a

Browse files
committed
feat: gotls Upload sequence; Fix the issue where disorderly arrival causes the file content to be uninterpretable
1 parent 15e4c93 commit 95e380a

7 files changed

Lines changed: 363 additions & 42 deletions

File tree

internal/probe/base/base_probe.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (p *BaseProbe) StartPerfEventReader(em *ebpf.Map, decoder domain.EventDecod
262262
return errors.NewEBPFAttachError(em.String(), err)
263263
}
264264

265-
p.readers = append(p.readers, rd)
265+
p.TrackReader(rd)
266266

267267
p.logger.Info().
268268
Str("map", em.String()).
@@ -273,6 +273,16 @@ func (p *BaseProbe) StartPerfEventReader(em *ebpf.Map, decoder domain.EventDecod
273273
return nil
274274
}
275275

276+
// TrackReader registers a reader to be closed when the probe shuts down.
277+
// Probes that implement a custom perf read loop should call TrackReader with the same reader
278+
// instance they pass to the goroutine, matching StartPerfEventReader lifecycle.
279+
func (p *BaseProbe) TrackReader(c closer) {
280+
if c == nil {
281+
return
282+
}
283+
p.readers = append(p.readers, c)
284+
}
285+
276286
// perfEventLoop reads events from a perf buffer.
277287
func (p *BaseProbe) perfEventLoop(rd *perf.Reader, em *ebpf.Map, decoder domain.EventDecoder) {
278288
for {

internal/probe/gotls/event.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,27 @@ const (
3636
// C struct layout (go_tls_event):
3737
//
3838
// u64 ts_ns; // offset 0, size 8
39-
// u32 pid; // offset 8, size 4
40-
// u32 tid; // offset 12, size 4
41-
// s32 data_len; // offset 16, size 4
42-
// u8 event_type; // offset 20, size 1
43-
// u8 pad[3]; // offset 21, size 3 (alignment padding)
44-
// u32 fd; // offset 24, size 4
45-
// u8 src_ip[16]; // offset 28, size 16
46-
// u16 src_port; // offset 44, size 2
47-
// u16 pad2; // offset 46, size 2 (alignment padding)
48-
// u8 dst_ip[16]; // offset 48, size 16
49-
// u16 dst_port; // offset 64, size 2
50-
// u8 ip_version; // offset 66, size 1 (4=IPv4, 6=IPv6)
51-
// u8 pad3; // offset 67, size 1 (alignment padding)
52-
// char comm[16]; // offset 68, size 16
53-
// char data[...]; // offset 84, variable
39+
// u64 seq; // offset 8, size 8 (per-CPU counter)
40+
// u32 emit_cpu; // offset 16, size 4
41+
// u32 pid; // offset 20, size 4
42+
// u32 tid; // offset 24, size 4
43+
// s32 data_len; // offset 28, size 4
44+
// u8 event_type; // offset 32, size 1
45+
// u8 pad[3]; // offset 33, size 3 (alignment padding)
46+
// u32 fd; // offset 36, size 4
47+
// u8 src_ip[16]; // offset 40, size 16
48+
// u16 src_port; // offset 56, size 2
49+
// u16 pad2; // offset 58, size 2 (alignment padding)
50+
// u8 dst_ip[16]; // offset 60, size 16
51+
// u16 dst_port; // offset 76, size 2
52+
// u8 ip_version; // offset 78, size 1 (4=IPv4, 6=IPv6)
53+
// u8 pad3; // offset 79, size 1 (alignment padding)
54+
// char comm[16]; // offset 80, size 16
55+
// char data[...]; // offset 96, variable
5456
type GoTLSDataEvent struct {
5557
Timestamp uint64 `json:"timestamp"`
58+
Seq uint64 `json:"seq"`
59+
EmitCPU uint32 `json:"emit_cpu"`
5660
Pid uint32 `json:"pid"`
5761
Tid uint32 `json:"tid"`
5862
DataLen int32 `json:"dataLen"`
@@ -69,6 +73,8 @@ type GoTLSDataEvent struct {
6973
Comm [16]byte `json:"comm"`
7074
Data []byte `json:"data"`
7175
Tuple string `json:"tuple"`
76+
// BpfMonoNs is bpf_ktime_get_ns() from the probe (stable ordering with Seq/EmitCPU); not wall clock.
77+
BpfMonoNs uint64 `json:"-"`
7278
}
7379

7480
// DecodeFromBytes deserializes the event from raw eBPF data.
@@ -79,6 +85,13 @@ func (e *GoTLSDataEvent) DecodeFromBytes(data []byte) error {
7985
if err := binary.Read(buf, binary.LittleEndian, &e.Timestamp); err != nil {
8086
return errors.NewEventDecodeError("gotls.Timestamp", err)
8187
}
88+
e.BpfMonoNs = e.Timestamp
89+
if err := binary.Read(buf, binary.LittleEndian, &e.Seq); err != nil {
90+
return errors.NewEventDecodeError("gotls.Seq", err)
91+
}
92+
if err := binary.Read(buf, binary.LittleEndian, &e.EmitCPU); err != nil {
93+
return errors.NewEventDecodeError("gotls.EmitCPU", err)
94+
}
8295
if err := binary.Read(buf, binary.LittleEndian, &e.Pid); err != nil {
8396
return errors.NewEventDecodeError("gotls.Pid", err)
8497
}
@@ -149,6 +162,18 @@ func (e *GoTLSDataEvent) DecodeFromBytes(data []byte) error {
149162
return nil
150163
}
151164

165+
// LessGoTLSDataEventByPerfOrder compares two events for emit order after merging per-CPU perf buffers.
166+
// Ordering matches gimli (bpf monotonic time, then EmitCPU, then per-CPU Seq).
167+
func LessGoTLSDataEventByPerfOrder(a, b *GoTLSDataEvent) bool {
168+
if a.BpfMonoNs != b.BpfMonoNs {
169+
return a.BpfMonoNs < b.BpfMonoNs
170+
}
171+
if a.EmitCPU != b.EmitCPU {
172+
return a.EmitCPU < b.EmitCPU
173+
}
174+
return a.Seq < b.Seq
175+
}
176+
152177
// GetTimestamp returns the event timestamp in nanoseconds.
153178
func (e *GoTLSDataEvent) GetTimestamp() uint64 {
154179
return e.Timestamp

internal/probe/gotls/event_test.go

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// buildEventBytes constructs a raw eBPF byte payload for GoTLSDataEvent.
2828
// Offsets match the C struct go_tls_event exactly.
2929
func buildEventBytes(t *testing.T,
30-
timestamp uint64, pid, tid uint32, dataLen int32, eventType uint8,
30+
timestamp uint64, seq uint64, emitCPU uint32, pid, tid uint32, dataLen int32, eventType uint8,
3131
fd uint32, srcIP [16]byte, srcPort uint16, dstIP [16]byte, dstPort uint16,
3232
ipVersion uint8, comm [16]byte, payload []byte,
3333
) []byte {
@@ -38,23 +38,25 @@ func buildEventBytes(t *testing.T,
3838
t.Fatalf("binary.Write failed: %v", err)
3939
}
4040
}
41-
write(timestamp) // offset 0, u64
42-
write(pid) // offset 8, u32
43-
write(tid) // offset 12, u32
44-
write(dataLen) // offset 16, s32
45-
write(eventType) // offset 20, u8
46-
write([3]byte{}) // offset 21, pad[3]
47-
write(fd) // offset 24, u32
48-
write(srcIP) // offset 28, u8[16]
49-
write(srcPort) // offset 44, u16
50-
write([2]byte{}) // offset 46, pad2
51-
write(dstIP) // offset 48, u8[16]
52-
write(dstPort) // offset 64, u16
53-
write(ipVersion) // offset 66, u8
54-
write(uint8(0)) // offset 67, pad3
55-
write(comm) // offset 68, char[16]
41+
write(timestamp) // offset 0, u64 ts_ns
42+
write(seq) // offset 8, u64 seq
43+
write(emitCPU) // offset 16, u32 emit_cpu
44+
write(pid) // offset 20, u32
45+
write(tid) // offset 24, u32
46+
write(dataLen) // offset 28, s32
47+
write(eventType) // offset 32, u8
48+
write([3]byte{}) // offset 33, pad[3]
49+
write(fd) // offset 36, u32
50+
write(srcIP) // offset 40, u8[16]
51+
write(srcPort) // offset 56, u16
52+
write([2]byte{}) // offset 58, pad2
53+
write(dstIP) // offset 60, u8[16]
54+
write(dstPort) // offset 76, u16
55+
write(ipVersion) // offset 78, u8
56+
write(uint8(0)) // offset 79, pad3
57+
write(comm) // offset 80, char[16]
5658
if len(payload) > 0 {
57-
buf.Write(payload) // offset 84, variable data
59+
buf.Write(payload) // offset 96, variable data
5860
}
5961
return buf.Bytes()
6062
}
@@ -84,7 +86,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_IPv4_Write(t *testing.T) {
8486
payload := []byte("GET / HTTP/1.1\r\n")
8587

8688
raw := buildEventBytes(t,
87-
1000000, 42, 99, int32(len(payload)), 0, // eventType=WRITE
89+
1000000, 1, 0, 42, 99, int32(len(payload)), 0, // eventType=WRITE
8890
7, srcIP, 12345, dstIP, 443, 4, // ipVersion=4
8991
commBytes("curl"), payload,
9092
)
@@ -97,6 +99,15 @@ func TestGoTLSDataEvent_DecodeFromBytes_IPv4_Write(t *testing.T) {
9799
if e.Timestamp != 1000000 {
98100
t.Errorf("Timestamp = %d, want 1000000", e.Timestamp)
99101
}
102+
if e.BpfMonoNs != 1000000 {
103+
t.Errorf("BpfMonoNs = %d, want 1000000", e.BpfMonoNs)
104+
}
105+
if e.Seq != 1 {
106+
t.Errorf("Seq = %d, want 1", e.Seq)
107+
}
108+
if e.EmitCPU != 0 {
109+
t.Errorf("EmitCPU = %d, want 0", e.EmitCPU)
110+
}
100111
if e.Pid != 42 {
101112
t.Errorf("Pid = %d, want 42", e.Pid)
102113
}
@@ -141,7 +152,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_IPv4_Read(t *testing.T) {
141152
payload := []byte("HTTP/1.1 200 OK\r\n")
142153

143154
raw := buildEventBytes(t,
144-
2000000, 100, 200, int32(len(payload)), 1, // eventType=READ
155+
2000000, 2, 0, 100, 200, int32(len(payload)), 1, // eventType=READ
145156
5, srcIP, 443, dstIP, 54321, 4,
146157
commBytes("myapp"), payload,
147158
)
@@ -174,7 +185,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_IPv6(t *testing.T) {
174185

175186
payload := []byte("hello")
176187
raw := buildEventBytes(t,
177-
3000000, 7, 8, int32(len(payload)), 0,
188+
3000000, 3, 0, 7, 8, int32(len(payload)), 0,
178189
3, ipv6Bytes(src6), 8080, ipv6Bytes(dst6), 9090, 6, // ipVersion=6
179190
commBytes("server"), payload,
180191
)
@@ -196,7 +207,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_IPv6(t *testing.T) {
196207

197208
func TestGoTLSDataEvent_DecodeFromBytes_ZeroDataLen(t *testing.T) {
198209
raw := buildEventBytes(t,
199-
1000, 1, 2, 0, 0, // dataLen=0
210+
1000, 4, 0, 1, 2, 0, 0, // dataLen=0
200211
1, ipv4Bytes(1, 2, 3, 4), 100, ipv4Bytes(5, 6, 7, 8), 200, 4,
201212
commBytes("app"), nil,
202213
)
@@ -212,9 +223,27 @@ func TestGoTLSDataEvent_DecodeFromBytes_ZeroDataLen(t *testing.T) {
212223
}
213224
}
214225

226+
func TestLessGoTLSDataEventByPerfOrder(t *testing.T) {
227+
a := &GoTLSDataEvent{BpfMonoNs: 100, EmitCPU: 1, Seq: 5}
228+
b := &GoTLSDataEvent{BpfMonoNs: 200, EmitCPU: 0, Seq: 1}
229+
if !LessGoTLSDataEventByPerfOrder(a, b) {
230+
t.Fatal("expected a before b (mono time)")
231+
}
232+
sameT := &GoTLSDataEvent{BpfMonoNs: 100, EmitCPU: 2, Seq: 1}
233+
otherCPU := &GoTLSDataEvent{BpfMonoNs: 100, EmitCPU: 1, Seq: 99}
234+
if !LessGoTLSDataEventByPerfOrder(otherCPU, sameT) {
235+
t.Fatal("expected CPU tie-break")
236+
}
237+
sameTS := &GoTLSDataEvent{BpfMonoNs: 100, EmitCPU: 1, Seq: 1}
238+
sameTS2 := &GoTLSDataEvent{BpfMonoNs: 100, EmitCPU: 1, Seq: 2}
239+
if !LessGoTLSDataEventByPerfOrder(sameTS, sameTS2) {
240+
t.Fatal("expected seq tie-break")
241+
}
242+
}
243+
215244
func TestGoTLSDataEvent_DecodeFromBytes_TimestampZeroFallback(t *testing.T) {
216245
raw := buildEventBytes(t,
217-
0, 1, 2, 0, 0, // timestamp=0 → should be filled by time.Now()
246+
0, 5, 0, 1, 2, 0, 0, // timestamp=0 → should be filled by time.Now()
218247
1, ipv4Bytes(1, 2, 3, 4), 80, ipv4Bytes(5, 6, 7, 8), 8080, 4,
219248
commBytes("app"), nil,
220249
)
@@ -241,7 +270,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_TruncatedInput(t *testing.T) {
241270
func TestGoTLSDataEvent_DecodeFromBytes_DataLenExceedsBuffer(t *testing.T) {
242271
// claim dataLen=100 but provide no payload bytes
243272
raw := buildEventBytes(t,
244-
1000, 1, 2, 100, 0, // dataLen=100
273+
1000, 6, 0, 1, 2, 100, 0, // dataLen=100
245274
1, ipv4Bytes(1, 2, 3, 4), 80, ipv4Bytes(5, 6, 7, 8), 8080, 4,
246275
commBytes("app"), nil, // no payload
247276
)
@@ -584,7 +613,7 @@ func TestGoTLSDataEvent_DecodeFromBytes_RoundTrip(t *testing.T) {
584613
copy(comm[:], "myservice")
585614

586615
raw := buildEventBytes(t,
587-
999888777, 12345, 67890, int32(len(wantPayload)), 0,
616+
999888777, 7, 0, 12345, 67890, int32(len(wantPayload)), 0,
588617
8, ipv4Bytes(172, 16, 0, 1), 54321, ipv4Bytes(172, 16, 0, 2), 443, 4,
589618
comm, wantPayload,
590619
)
@@ -594,6 +623,12 @@ func TestGoTLSDataEvent_DecodeFromBytes_RoundTrip(t *testing.T) {
594623
t.Fatalf("DecodeFromBytes error: %v", err)
595624
}
596625

626+
if e.Seq != 7 {
627+
t.Errorf("Seq = %d, want 7", e.Seq)
628+
}
629+
if e.EmitCPU != 0 {
630+
t.Errorf("EmitCPU = %d, want 0", e.EmitCPU)
631+
}
597632
if e.Pid != 12345 {
598633
t.Errorf("Pid = %d, want 12345", e.Pid)
599634
}

internal/probe/gotls/gotls_probe.go

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package gotls
1717
import (
1818
"bytes"
1919
"context"
20+
stderrors "errors"
2021
"fmt"
2122
"io"
2223
"math"
@@ -25,6 +26,7 @@ import (
2526
"strings"
2627

2728
"github.com/cilium/ebpf"
29+
"github.com/cilium/ebpf/perf"
2830
manager "github.com/gojue/ebpfmanager"
2931
"golang.org/x/sys/unix"
3032

@@ -137,7 +139,13 @@ func (p *Probe) Start(ctx context.Context) error {
137139

138140
// Start event readers for all configured maps
139141
for em, decoder := range p.eventFuncMaps {
140-
if err := p.StartPerfEventReader(em, decoder); err != nil {
142+
var err error
143+
if _, ok := decoder.(*tlsDataEventDecoder); ok {
144+
err = p.startGoTLSOrderedPerfEventReader(em, decoder)
145+
} else {
146+
err = p.StartPerfEventReader(em, decoder)
147+
}
148+
if err != nil {
141149
return err
142150
}
143151
p.Logger().Debug().
@@ -181,6 +189,84 @@ func (p *Probe) retrieveEventMaps() error {
181189
return nil
182190
}
183191

192+
// startGoTLSOrderedPerfEventReader reads TLS plaintext perf events and emits them ordered by
193+
// (BpfMonoNs, EmitCPU, Seq), matching gimli's merge semantics across CPUs.
194+
func (p *Probe) startGoTLSOrderedPerfEventReader(em *ebpf.Map, decoder domain.EventDecoder) error {
195+
if em == nil {
196+
return errors.New(errors.ErrCodeEBPFMapAccess, "eBPF map cannot be nil")
197+
}
198+
if decoder == nil {
199+
return errors.New(errors.ErrCodeConfiguration, "event decoder cannot be nil")
200+
}
201+
mapSize := p.config.GetPerCpuMapSize()
202+
rd, err := perf.NewReader(em, mapSize)
203+
if err != nil {
204+
return errors.NewEBPFAttachError(em.String(), err)
205+
}
206+
p.TrackReader(rd)
207+
p.Logger().Info().
208+
Str("map", em.String()).
209+
Int("size_mb", mapSize/1024/1024).
210+
Msg("Perf event reader started (GoTLS reorder)")
211+
go p.goTLSOrderedPerfLoop(rd, em, decoder)
212+
return nil
213+
}
214+
215+
func (p *Probe) goTLSOrderedPerfLoop(rd *perf.Reader, em *ebpf.Map, decoder domain.EventDecoder) {
216+
reorder := newGoTLSPerfReorder()
217+
defer func() {
218+
for _, ev := range reorder.flushAll() {
219+
if err := p.Dispatcher().Dispatch(ev); err != nil {
220+
p.Logger().Warn().Err(err).Msg("Failed to dispatch reordered GoTLS event")
221+
}
222+
}
223+
}()
224+
for {
225+
select {
226+
case <-p.Context().Done():
227+
p.Logger().Debug().Msg("GoTLS perf reader stopping")
228+
return
229+
default:
230+
}
231+
record, err := rd.Read()
232+
if err != nil {
233+
if stderrors.Is(err, perf.ErrClosed) {
234+
return
235+
}
236+
p.Logger().Warn().Err(err).Msg("Error reading from perf buffer")
237+
continue
238+
}
239+
if record.LostSamples != 0 {
240+
p.Logger().Warn().
241+
Uint64("lost_samples", record.LostSamples).
242+
Msg("Perf buffer full, samples lost")
243+
continue
244+
}
245+
event, err := decoder.Decode(em, record.RawSample)
246+
if err != nil {
247+
if stderrors.Is(err, errors.ErrEventNotReady) {
248+
p.Logger().Debug().Msg("Event not ready, skipping")
249+
continue
250+
}
251+
p.Logger().Warn().Err(err).Msg("Failed to decode event")
252+
continue
253+
}
254+
p.Logger().Debug().Str("event", event.String()).Msg("Perf event decoded")
255+
gte, ok := event.(*GoTLSDataEvent)
256+
if !ok {
257+
if err := p.Dispatcher().Dispatch(event); err != nil {
258+
p.Logger().Warn().Err(err).Msg("Failed to dispatch event")
259+
}
260+
continue
261+
}
262+
for _, ev := range reorder.push(gte) {
263+
if err := p.Dispatcher().Dispatch(ev); err != nil {
264+
p.Logger().Warn().Err(err).Msg("Failed to dispatch reordered GoTLS event")
265+
}
266+
}
267+
}
268+
}
269+
184270
// Events returns the eBPF maps for event collection.
185271
func (p *Probe) Events() []*ebpf.Map {
186272
return p.eventMaps
@@ -554,7 +640,6 @@ type tlsDataEventDecoder struct {
554640

555641
func (d *tlsDataEventDecoder) Decode(_ *ebpf.Map, data []byte) (domain.Event, error) {
556642
event := &GoTLSDataEvent{}
557-
fmt.Println("Decoding TLSDataEvent from bytes, data length:", len(data))
558643
if err := event.DecodeFromBytes(data); err != nil {
559644
return nil, err
560645
}

0 commit comments

Comments
 (0)