Skip to content

pubsub: switch from unixpacket to framed stream sockets#5644

Draft
rucoder wants to merge 3 commits intolf-edge:masterfrom
rucoder:rucoder/pubsub-msg-size
Draft

pubsub: switch from unixpacket to framed stream sockets#5644
rucoder wants to merge 3 commits intolf-edge:masterfrom
rucoder:rucoder/pubsub-msg-size

Conversation

@rucoder
Copy link
Contributor

@rucoder rucoder commented Mar 2, 2026

pubsub: switch from unixpacket to framed stream sockets

Summary

Replaces raw unixpacket (SOCK_SEQPACKET) sockets with unix (SOCK_STREAM) sockets using the getlantern/framed library with 4-byte little-endian length-prefix framing (EnableBigFrames()). This removes the hard OS-level 65 KB per-message limit and replaces it with a configurable 10 MB application-level limit.

This is three commits:

  1. Switch to framed stream sockets — wire protocol change
  2. FrameReader arena + maxsize enforcement — per-reader reusable buffer (avoids 65 KB pool alloc per subscriber); enforces maxsize on both send and receive
  3. FrameReader stats via GlobalConfig — optional per-topic instrumentation (disabled by default; enabled via debug.pubsub.stats.interval controller key)

What Changed

Commit 1: Wire protocol

  • unixpacketunix stream sockets in socketdriver and reverse pubsub
  • Wrap connections with framed.Reader/framed.Writer + EnableBigFrames() (4-byte length prefix, little-endian)
  • Raise maxsize from 65535 to 10 MB (applies after base64 encoding, so effective raw JSON limit ≈ 7.5 MB)
  • Remove ConnReadCheckframed.ReadFrame() blocks correctly on stream sockets

Commit 2: FrameReader arena + maxsize

  • FrameReader: per-reader buffer starting at 1 KB, growing 25% on demand, never shrinking
    • Replaces global sync.Pool of 65 KB fixed buffers → ~95% memory reduction (measured: 63 KB arena vs 1.3 MB old pool for 21 active topics)
  • Enforce maxsize consistently on both send (> maxsize) and receive (> maxsize)
  • CheckMaxSize helper for callers to validate before publishing

Commit 3: Stats instrumentation via GlobalConfig

  • Per-FrameReader atomic counters: frames, bytes, grows, max frame size, 9-bucket size histogram
  • Global registry (topic → stats); exported via GetAllReaderStats()
  • Optional CSV logger (StartStatsLogger) + matplotlib visualizer (plot-frame-stats.py)
  • Activated dynamically from the controller via debug.pubsub.stats.interval (seconds; 0 = off)
  • newFrameReaderInternal (no stats) used for publisher's single handshake read to avoid stats key collision
  • Bucket atomics use sync/atomic to avoid data race between reader and stats goroutines

Stats Run Results (4-minute boot on QEMU)

pubsub-frame-stats

FrameReader stats after 4-minute EVE boot

Key findings:

  • Memory: Arena uses ~63 KB vs old pool's ~1.3 MB (21 topics × 65 KB) — 95% reduction
  • 18 of 21 topics never grow beyond the 1 KB initial buffer
  • 3 topics that grow: ConfigItemValueMap (24.4 KB, 1 grow), DeviceNetworkStatus (7.0 KB, 2 grows), DevicePortConfigList (2.7 KB, 1 grow)
  • All frames ≤ 4 KB during boot — well within the 10 MB limit
  • Conclusion: 1 KB default buffer size is correct; no tuning needed

Note: The current branch has debug.pubsub.stats.interval default set to 1s temporarily for continued profiling while onboarding a real device. Will be set back to 0 (off by default) before merge.

How to Test

# Unit tests
cd pkg/pillar && make test

# Enable stats on a running device (via controller or local override):
# Set debug.pubsub.stats.interval = 30 (seconds)
# Stats CSV written to /persist/pubsub-frame-stats.csv
# Visualize with:
python3 pkg/pillar/pubsub/socketdriver/plot-frame-stats.py /persist/pubsub-frame-stats.csv

Changelog Notes

Increased pubsub IPC message size limit from 65 KB to 10 MB by switching from raw unixpacket datagram sockets to length-prefixed stream sockets. Memory usage reduced ~95% via per-reader arena buffers. Optional per-topic stats collection available via debug.pubsub.stats.interval controller key.

PR Backports

  • 16.0-stable: No backport.
  • 14.5-stable: No backport.
  • 13.4-stable: No backport.

Checklist

  • I've provided a proper description

  • I've added the proper documentation

  • I've tested my PR on amd64 device (QEMU + onboarding in progress)

  • I've tested my PR on arm64 device

  • I've written the test verification instructions

  • I've set the proper labels to this PR

  • I've checked the boxes above, or I've provided a good reason why I didn't check them.

@rene
Copy link
Contributor

rene commented Mar 2, 2026

@eriknordmark , @milan-zededa , pls, let's take a look closer to this PR since it touches a critical pubsub infrastructure....

@rucoder
Copy link
Contributor Author

rucoder commented Mar 2, 2026

@rene @eriknordmark @michel-zededa I did not touch "pubsub-large" yet. this is experimental PR. I'm 99% sure it works but let's test

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates pubsub’s IPC transport to remove the historical ~65KB message size ceiling by switching from unixpacket sockets to unix stream sockets and introducing length-prefixed framing using the vendored getlantern/framed library with big frames enabled.

Changes:

  • Switch pubsub socket transport from unixpacket to unix stream sockets.
  • Wrap IPC reads/writes with framed.Reader/framed.Writer using EnableBigFrames().
  • Increase the socketdriver maxsize limit to 10MB and update related tests/comments.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
pkg/pillar/pubsub/socketdriver/subscribe.go Subscriber now dials unix and reads framed messages.
pkg/pillar/pubsub/socketdriver/publish.go Publisher now accepts unix stream conns, reads framed requests, and writes framed updates.
pkg/pillar/pubsub/socketdriver/driver.go Raises maxsize to 10MB; adds framed reader/writer helpers and removes buffer pool.
pkg/pillar/pubsub/reverse/subscribe.go Reverse subscriber now listens on unix and reads framed messages.
pkg/pillar/pubsub/reverse/publish.go Reverse publisher now dials unix and writes framed messages.
pkg/pillar/pubsub/checkmaxsize_test.go Updates test commentary/cases to reflect removal of the old ~65KB boundary behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 409 to 415
s.log.Tracef("sendUpdate(%s): key %s\n", s.name, key)
// base64-encode to avoid having spaces in the key and val
sendKey := base64.StdEncoding.EncodeToString([]byte(key))
sendVal := base64.StdEncoding.EncodeToString(val)
buf := fmt.Sprintf("update %s %s %s", s.topic, sendKey, sendVal)
if len(buf) >= maxsize {
s.log.Fatalf("Too large message (%d bytes) sent to %s topic %s key %s",
len(buf), s.name, s.topic, key)
}
_, err := sock.Write([]byte(buf))
_, err := writer.Write([]byte(buf))
return err
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These send* methods no longer enforce the driver maxsize limit. Since PublicationImpl.Publish() does not call CheckMaxSize(), oversized values can now be sent over IPC, bypassing the intended 10MB cap and potentially causing large allocations/pressure on subscribers. Reintroduce a size check (ideally returning an error rather than Fatal) before writer.Write(), or enforce maxsize at publish/serialize time for all outgoing messages.

Copilot uses AI. Check for mistakes.
Comment on lines +441 to +442
// CheckMaxSize returns an error if too large.
// With framed big frames the limit is ~4GB, so this is unlikely to trigger.
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on CheckMaxSize is misleading: the framed transport may support ~4GB frames, but this driver still enforces maxsize (currently 10MB) and this function can definitely trigger. Please update the comment to reflect the actual enforced limit and its purpose (protecting memory/IPC).

Suggested change
// CheckMaxSize returns an error if too large.
// With framed big frames the limit is ~4GB, so this is unlikely to trigger.
// CheckMaxSize returns an error if the serialized message exceeds maxsize.
// Even though the framed transport can support ~4GB frames, this driver enforces
// a much smaller application-level limit (currently 10MB) to protect memory/IPC.

Copilot uses AI. Check for mistakes.
Comment on lines 244 to 251
// NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames
// (32-bit length prefix), allowing messages up to ~4GB.
func NewFramedReader(conn net.Conn) *framed.Reader {
r := framed.NewReader(conn)
r.EnableBigFrames()
return r
}

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewFramedReader/NewFramedWriter enable big frames (~4GB), but the framed.Reader.ReadFrame() API allocates based on the length prefix and does not enforce a maximum on reads (only writes check maxFrameLength). Given this package defines maxsize (10MB), consider centralizing a bounded read helper in socketdriver (e.g., read length, reject > maxsize, then read) so all IPC consumers are protected from oversized/malicious frames.

Suggested change
// NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames
// (32-bit length prefix), allowing messages up to ~4GB.
func NewFramedReader(conn net.Conn) *framed.Reader {
r := framed.NewReader(conn)
r.EnableBigFrames()
return r
}
// NewFramedReader wraps a net.Conn with a framed.Reader.
// Big frames are not enabled to avoid unbounded allocations on reads.
func NewFramedReader(conn net.Conn) *framed.Reader {
r := framed.NewReader(conn)
return r
}

Copilot uses AI. Check for mistakes.
Comment on lines +73 to +84
reader := socketdriver.NewFramedReader(conn)

for {
// wait for readable conn
if err := pubsub.ConnReadCheck(conn); err != nil {
if err != io.EOF {
log.Errorf("serveConnection: Error on connReadCheck: %s",
err)
}
break
}

buf, doneFunc := socketdriver.GetBuffer()
defer doneFunc()

count, err := conn.Read(buf)
frame, err := reader.ReadFrame()
if err != nil {
if err != io.EOF {
log.Errorf("serveConnection: Error on read: %s",
err)
}
break
}
retChan <- string(buf[:count])
retChan <- string(frame)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serveConnection uses framed.Reader.ReadFrame(), which can allocate arbitrarily large buffers based on the peer-provided length prefix (big frames are enabled). Since this listener accepts local connections, a buggy/malicious publisher can DoS the subscriber via OOM. Use a bounded read (enforce socketdriver maxsize) before allocating/reading the payload.

Copilot uses AI. Check for mistakes.
Comment on lines +46 to +49
// With framed big frames (32-bit length prefix) the IPC message size
// limit is ~4GB, so the old 65KB boundary tests no longer apply.
// The "large tag" mechanism (pubsub-large-*) still has its own 1MB
// limit enforced by large.go, so those cases remain.
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestCheckMaxSize no longer exercises the driver maxsize boundary after increasing it to 10MB. To avoid regressions, add at least one case that exceeds the new maxsize (e.g., ~11–12MB payload after base64 overhead) and assert that CheckMaxSize returns an error.

Copilot uses AI. Check for mistakes.
Comment on lines +269 to 272
frame, err := s.reader.ReadFrame()
if err != nil {
errStr := fmt.Sprintf("connectAndRead(%s): sock read failed %s",
s.name, err)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using framed.Reader.ReadFrame() is unsafe here because the vendored framed implementation allocates a buffer based solely on the on-wire length prefix and does not enforce any maximum. With big frames enabled, a malformed or malicious peer can trigger very large allocations (OOM) before any pubsub-level size checks run. Prefer reading the length prefix, rejecting frames > maxsize (10MB), and then reading into a bounded/reusable buffer (e.g., use framed.Reader.Read() with a preallocated maxsize buffer or implement a capped ReadFrame helper).

Copilot uses AI. Check for mistakes.
Comment on lines 290 to 295
// Read request
res, err := conn.Read(buf)
frame, err := reader.ReadFrame()
if err != nil {
// Peer process could have died
s.log.Errorf("serveConnection(%s/%d) error: %v", s.name, instance, err)
return
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serveConnection reads the initial request using framed.Reader.ReadFrame(), which allocates according to the peer-provided length prefix with no upper bound. Even though the request should be small, a bad client can force large allocations and DoS the publisher. Consider reading the request with a fixed-size buffer (since the request is known to be small) or adding a max-frame-size check before allocating/reading the full payload.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@eriknordmark eriknordmark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://github.com/getlantern/framed/blob/master/framed.go the max length is 65535 so this doesn't change anything but adding silent message truncation for larger messages,

If framed.go did support 4GByte size messages my question would be what the max memory usage would be if we started having pubsub use messages that are in the size of Megabytes. I see you removed the buffer pool which means that memory usage would increase even if the we stay below 65536, which already is a significant concern; it takes a while for the GC to kick in.

So I don't understand what problem we need to fix here - replacing teaching folks about a max size and how to use pubsub 'large' with additional OOM risk doesn't seem like an improvement to me.

@rucoder rucoder force-pushed the rucoder/pubsub-msg-size branch from 9ace3e8 to ce6c077 Compare March 2, 2026 16:07
@github-actions github-actions bot requested a review from eriknordmark March 2, 2026 16:07
@rucoder
Copy link
Contributor Author

rucoder commented Mar 2, 2026

According to https://github.com/getlantern/framed/blob/master/framed.go the max length is 65535 so this doesn't change anything but adding silent message truncation for larger messages,

no, we use EnableBigFrames to overcome this limitation

If framed.go did support 4GByte size messages my question would be what the max memory usage would be if we started having pubsub use messages that are in the size of Megabytes. I see you removed the buffer pool which means that memory usage would increase even if the we stay below 65536, which already is a significant concern; it takes a while for the GC to kick in.

So I don't understand what problem we need to fix here - replacing teaching folks about a max size and how to use pubsub 'large' with additional OOM risk doesn't seem like an improvement to me.

when we use pubsub large we load message to RAM one way or another, you are right, Framed doesn't increase the size but send a single message as fragments then assemble them back to full packet. the idea was to handle messages that are not annotated as "pubsub-large" but suddenly become >64k in side. This exactly what we had recently if 20 SR-IOV VF were enabled

@eriknordmark since pbsub-large is still in places, only messages that accidentally exceed the sized will benefit. To avoid the concern about GC pressure I'll revert to a buffer allocated on client side. Basically the same behavior for messages <=64k and grow buffer if necessary for bigger messages

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +48 to +50
// We add new test cases that exceed the 10MB limit to verify it.
// The "large tag" mechanism (pubsub-large-*) still has its own 1MB
// limit enforced by large.go, so those cases remain.
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the new test cases “exceed the 10MB limit”, but these cases use 8 * 1024 * 1024. If the intent is to exceed the 10MB maxsize after base64/message overhead, please call that out explicitly (otherwise consider using a size that is unambiguously over the limit).

Suggested change
// We add new test cases that exceed the 10MB limit to verify it.
// The "large tag" mechanism (pubsub-large-*) still has its own 1MB
// limit enforced by large.go, so those cases remain.
// We add new test cases with 8MiB payloads; while the raw payload size
// is below 10MB, base64 encoding and message framing overhead push the
// resulting IPC message size over the effective 10MB maxsize we verify.

Copilot uses AI. Check for mistakes.
Comment on lines 245 to 266
// NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames
// (32-bit length prefix). Callers must use readFrame() instead of calling
// ReadFrame() directly, to enforce the maxsize limit and prevent unbounded
// memory allocation from a buggy peer.
func NewFramedReader(conn net.Conn) *framed.Reader {
r := framed.NewReader(conn)
r.EnableBigFrames()
return r
}

// logs a message if the allocation changed
var lastLoggedAllocated uint32

func maybeLogAllocated(log *base.LogObject) {
currentAllocated := atomic.LoadUint32(&allocated)
currentLastAllocated := atomic.LoadUint32(&lastLoggedAllocated)
if currentLastAllocated == currentAllocated {
return
// ReadFrame reads a single frame from the framed.Reader and rejects frames
// that exceed maxsize. This protects against unbounded memory allocation
// from a buggy or malicious peer sending a large length prefix.
func ReadFrame(r *framed.Reader) ([]byte, error) {
frame, err := r.ReadFrame()
if err != nil {
return nil, err
}
if len(frame) > maxsize {
return nil, fmt.Errorf("received frame of %d bytes exceeds max %d",
len(frame), maxsize)
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadFrame() currently calls r.ReadFrame() and only then checks len(frame) > maxsize. In getlantern/framed, Reader.ReadFrame() allocates make([]byte, n) based on the length prefix before returning, so this code does not prevent unbounded memory allocation / OOM from a large length prefix. To actually enforce maxsize, read and validate the length prefix first (before allocating) or use a bounded read strategy and close/drain the connection on oversize frames. Also, the doc comment above NewFramedReader refers to readFrame() but the exported helper is ReadFrame().

Copilot uses AI. Check for mistakes.
@codecov
Copy link

codecov bot commented Mar 2, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 29.49%. Comparing base (2281599) to head (dae3b3a).
⚠️ Report is 331 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5644      +/-   ##
==========================================
+ Coverage   19.52%   29.49%   +9.96%     
==========================================
  Files          19       18       -1     
  Lines        3021     2417     -604     
==========================================
+ Hits          590      713     +123     
+ Misses       2310     1552     -758     
- Partials      121      152      +31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rucoder
Copy link
Contributor Author

rucoder commented Mar 2, 2026

@eriknordmark @rene it would be interesting to kick eden tests to see how it works

@rucoder rucoder marked this pull request as draft March 2, 2026 17:45
@eriknordmark
Copy link
Contributor

when we use pubsub large we load message to RAM one way or another, you are right, Framed doesn't increase the size but send a single message as fragments then assemble them back to full packet. the idea was to handle messages that are not annotated as "pubsub-large" but suddenly become >64k in side. This exactly what we had recently if 20 SR-IOV VF were enabled

Actually we don't know how many copies of that 10MByte we will end up in memory when sending and receiving it.
I see at least one additional copy (where it gets sent over a channel).

@eriknordmark since pbsub-large is still in places, only messages that accidentally exceed the sized will benefit. To avoid the concern about GC pressure I'll revert to a buffer allocated on client side. Basically the same behavior for messages <=64k and grow buffer if necessary for bigger messages

I think it would be a bad idea to replace the annoying but quickly detected log.Fatal when trying a new configuration (such as the above more VF case) with an increased risk that devices will hit an OOM in production based on unpredictable load conditions.

Replace raw unixpacket sockets with unix stream sockets using the
getlantern/framed library with 32-bit length-prefixed framing. This
removes the 65KB (16-bit) message size limitation that has been a
long-standing issue, raising the effective limit to 10MB.

Changes:
- socketdriver: switch unixpacket to unix stream sockets
- socketdriver: use framed.Reader/Writer with EnableBigFrames()
- socketdriver: raise maxsize from 65535 to 10MB
- socketdriver: remove buffer pool (GetBuffer/bufPool) and
  allocation tracking, no longer needed with framed
- reverse pubsub: same treatment (unix + framed)
- Remove ConnReadCheck usage (framed ReadFrame blocks properly)
- Update CheckMaxSize tests for new limit

Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
@rucoder rucoder force-pushed the rucoder/pubsub-msg-size branch 4 times, most recently from 08dc2be to 3ddf8a5 Compare March 6, 2026 10:57
@github-actions github-actions bot requested a review from jsfakian March 6, 2026 10:57
@rucoder rucoder force-pushed the rucoder/pubsub-msg-size branch 2 times, most recently from c6ea46e to 29d0798 Compare March 6, 2026 11:09
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

)

with open(path, newline="", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_csv assumes the CSV has a header row (reader.fieldnames is non-nil). If the file exists but is empty/partial (e.g. stats enabled briefly), this will raise an exception before you can print a helpful error. Consider checking reader.fieldnames for nil/empty and returning a user-friendly message.

Suggested change
reader = csv.DictReader(csvfile)
reader = csv.DictReader(csvfile)
# Handle empty or malformed CSV files without a header row.
if not reader.fieldnames:
print(
f"Error: CSV file '{path}' is empty or missing a header row; nothing to plot.",
file=sys.stderr,
)
return {}, []

Copilot uses AI. Check for mistakes.
Comment on lines 76 to 93
"File too large": {
agentName: "",
// agentScope: "testscope1",
stringSize: 49122,
stringSize: 8 * 1024 * 1024,
expectFail: true,
},
"IPC too large": {
agentName: "testagent1",
// agentScope: "testscope",
stringSize: 49122,
stringSize: 8 * 1024 * 1024,
expectFail: true,
},
"IPC with persistent too large": {
agentName: "testagent2",
// agentScope: "testscope",
persistent: true,
stringSize: 49122,
stringSize: 8 * 1024 * 1024,
expectFail: true,
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new oversize test inputs use 8MB strings to trigger the 10MB framed limit. Note that the test later allocates and fills these buffers byte-by-byte, which can add noticeable runtime/CPU to unit tests. Consider either lowering the oversize payload while still deterministically exceeding maxsize after JSON+base64 overhead, or constructing the data more efficiently to keep tests fast.

Copilot uses AI. Check for mistakes.
Comment on lines +309 to +318
var (
globalStatsMu sync.Mutex
globalReaderStats = make(map[string]*readerStats) // key is topic name
)

// registerReader registers a FrameReader's stats under the given topic.
func registerReader(topic string, stats *readerStats) {
globalStatsMu.Lock()
defer globalStatsMu.Unlock()
globalReaderStats[topic] = stats
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The globalReaderStats registry is keyed only by topic name, but a single process can have multiple subscriptions/readers for the same topic (e.g. nim subscribes to types.DevicePortConfig from multiple agents). This causes stats entries to be overwritten and Close() from one reader to delete stats for the others. Use a unique key (e.g. publisher agent+topic, socket path, or a generated reader ID) and keep the human-readable topic as a separate field in the exported snapshot.

Copilot uses AI. Check for mistakes.
Comment on lines +498 to +515
func StartStatsLogger(log *base.LogObject, interval time.Duration, csvPath string) func() {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := writeStatsCSV(csvPath); err != nil {
log.Errorf("FrameReader stats CSV: %v", err)
}
LogReaderStats(log)
case <-done:
return
}
}
}()
return func() { close(done) }
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartStatsLogger calls time.NewTicker(interval) without validating interval; time.NewTicker panics for <=0 durations. Since this is an exported helper, guard against non-positive intervals (e.g., return a no-op stop func, or treat <=0 as disabled) to avoid accidental panics from callers.

Copilot uses AI. Check for mistakes.
}
s.sock = sock
s.writer = NewFramedWriter(sock)
s.reader = NewFrameReader(sock, s.topic)
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FrameReader is registered under s.topic only. Because the same process can subscribe to the same TopicImpl from different publishers (same topic name but different AgentName), this will collide in the stats registry and make start/stop/unregister behave incorrectly for instrumentation. Consider passing a unique identifier to NewFrameReader (e.g. s.name+"/"+s.topic or s.sockName) and keeping the topic separately for grouping in reports.

Suggested change
s.reader = NewFrameReader(sock, s.topic)
s.reader = NewFrameReader(sock, s.name+"/"+s.topic)

Copilot uses AI. Check for mistakes.
sys.exit(1)

if output:
matplotlib.use("Agg")
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matplotlib backend is switched to "Agg" after pyplot has already been imported at module import time. matplotlib requires selecting the backend before importing matplotlib.pyplot; otherwise -o may not work reliably (and can emit warnings/errors). Consider moving the pyplot import inside plot() after setting the backend, or use plt.switch_backend('Agg') when output is requested.

Suggested change
matplotlib.use("Agg")
plt.switch_backend("Agg")

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +44
import numpy as np
_HAVE_MATPLOTLIB = True
except ImportError:
_HAVE_MATPLOTLIB = False


Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ImportError handler treats any missing dependency as "matplotlib is required", but this block also imports numpy. If numpy is missing, the error message will be misleading. Consider adjusting the message to mention both (or separating the imports/except blocks).

Suggested change
import numpy as np
_HAVE_MATPLOTLIB = True
except ImportError:
_HAVE_MATPLOTLIB = False
_HAVE_MATPLOTLIB = True
except ImportError:
_HAVE_MATPLOTLIB = False
try:
import numpy as np
except ImportError:
# numpy is also required for plotting; treat its absence the same
# way as a missing matplotlib installation.
_HAVE_MATPLOTLIB = False

Copilot uses AI. Check for mistakes.
Comment on lines +405 to +444
// writeStatsCSV appends one row per topic plus a "__totals__" row to the given
// CSV file. The file is created with a header on first call.
// CSV columns:
//
// timestamp, topic, frames, bytes, grows, max_frame, buf_size,
// old_pool_mem, arena_mem, saved_bytes, arena_allocs,
// bucket_256B, bucket_1KB, bucket_4KB, bucket_16KB,
// bucket_64KB, bucket_256KB, bucket_1MB, bucket_10MB, bucket_over
func writeStatsCSV(filePath string) error {
entries := GetAllReaderStats()
if len(entries) == 0 {
return nil
}
sort.Slice(entries, func(i, j int) bool {
return entries[i].Topic < entries[j].Topic
})

needHeader := false
if _, err := os.Stat(filePath); os.IsNotExist(err) {
needHeader = true
}

f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("writeStatsCSV: open %s: %w", filePath, err)
}
defer f.Close()

w := csv.NewWriter(f)
defer w.Flush()

if needHeader {
header := []string{
"timestamp", "topic", "frames", "bytes", "grows",
"max_frame", "buf_size", "old_pool_mem", "arena_mem",
"saved_bytes", "arena_allocs",
}
for _, label := range frameSizeBucketLabels {
header = append(header, "bucket_"+label)
}
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CSV header labels written here are derived from frameSizeBucketLabels (e.g. "bucket_<=256B"), but the comment above still documents different column names (bucket_256B, bucket_over, etc.). Please update the comment to match the actual output to avoid breaking parsers/tools that rely on the documented schema.

Copilot uses AI. Check for mistakes.
configItemSpecMap.AddStringItem(AppBootOrder, "", validateBootOrder)
configItemSpecMap.AddStringItem(TUIMonitorLogLevel, "info", blankValidator)
configItemSpecMap.AddStringItem(EdgeviewPublicKeys, "", blankValidator)
configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets PubsubStatsInterval default to 1s, which enables the stats logger by default and will continuously log + append to /persist on all devices. The PR description indicates this should be off by default; please change the default back to 0 (disabled) before merge, leaving the feature controllable via the controller key.

Copilot uses AI. Check for mistakes.
@rucoder
Copy link
Contributor Author

rucoder commented Mar 6, 2026

@eriknordmark I think I introduced a lot of confusing by mentioning SR-IOV, I meant the issue that was recently reported after SRIOV fixes were merged -- some pubsub struct grew beyond 65k and it was not marked as pubsub-large. Anyways, please have a look at updated PR. I added arena allocator and statistics collection. I still need to test on a real device with apps + many networks etc.
@rene could you run ztaf again (I keep forgetting how to do so)?

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +4 to +13
"""
Plot FrameReader statistics from CSV produced by socketdriver.StartStatsLogger.

Usage:
# Copy CSV from device:
scp device:/persist/pubsub-frame-stats.csv .

# Plot all graphs:
python3 plot_frame_stats.py pubsub-frame-stats.csv

Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description/testing instructions refer to a plot script under pkg/pillar/pubsub/socketdriver/plot-frame-stats.py, but this PR adds tools/plot_frame_stats.py (different path/name). Please align the documentation/instructions (and any packaging/install expectations) so users can find and run the script as documented.

Copilot uses AI. Check for mistakes.
Comment on lines +571 to +579
// Read 4-byte length header
if _, err := io.ReadFull(fr.r, fr.hdr[:]); err != nil {
return nil, err
}
n := int(binary.LittleEndian.Uint32(fr.hdr[:]))
if n > maxsize {
return nil, fmt.Errorf("received frame of %d bytes exceeds max %d",
n, maxsize)
}
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FrameReader.ReadFrame converts the 32-bit length prefix to int before validating. On 32-bit platforms, a length > math.MaxInt32 will wrap to a negative int, bypass the maxsize check, and then panic when slicing fr.buf[:n]. Validate the length as a uint32 (and ensure it fits into int) before converting, and explicitly reject negative/overflow cases.

Copilot uses AI. Check for mistakes.
configItemSpecMap.AddStringItem(AppBootOrder, "", validateBootOrder)
configItemSpecMap.AddStringItem(TUIMonitorLogLevel, "info", blankValidator)
configItemSpecMap.AddStringItem(EdgeviewPublicKeys, "", blankValidator)
configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PubsubStatsInterval is added as a debug knob, but the default is currently set to 1s (commented as temporary). This will enable periodic stats logging by default on all devices, causing unnecessary CPU/disk writes to /persist. Please set the default back to 0 (disabled) before merge, and keep profiling defaults out of the main branch (or gate behind a build/debug flag).

Suggested change
configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run
configItemSpecMap.AddIntItem(PubsubStatsInterval, 0, 0, 3600) // default 0s (disabled) to avoid unnecessary profiling

Copilot uses AI. Check for mistakes.

// StartStatsLogger starts a goroutine that periodically writes FrameReader
// statistics to a CSV file and optionally also logs them.
// csvPath is the output CSV file (e.g. "/persist/pubsub-stats.csv").
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartStatsLogger doc comment mentions csvPath example "/persist/pubsub-stats.csv", but the rest of this PR (zedbox statsCSVPath and plotter) uses "/persist/pubsub-frame-stats.csv". Please align the comment/example (or the filename) to avoid confusion when enabling/consuming stats.

Suggested change
// csvPath is the output CSV file (e.g. "/persist/pubsub-stats.csv").
// csvPath is the output CSV file (e.g. "/persist/pubsub-frame-stats.csv").

Copilot uses AI. Check for mistakes.
Comment on lines +73 to +74
reader := socketdriver.NewFrameReader(conn, name)
defer reader.Close()
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serveConnection registers FrameReader stats using the socket path (sockName) as the key. Since startSubscriber accepts connections in a loop and spawns a goroutine per connection, multiple concurrent publishers will overwrite each other in the global stats map and unregistering one connection can delete stats still in use by another. Consider using a unique per-connection key (e.g., include an incrementing instance or conn.RemoteAddr()), or use a stats-free reader here if reverse pubsub stats aren’t needed.

Copilot uses AI. Check for mistakes.
Address review feedback:

- Add FrameReader with a reusable buffer that starts at 1KB and grows
  with 25% headroom as needed (up to maxsize). This avoids per-read
  allocations from framed.ReadFrame() while using less memory than
  the old fixed 65KB buffer pool for small-message topics.
- Enforce maxsize (10MB) on both send and receive paths: send methods
  return an error if the message exceeds maxsize, FrameReader rejects
  frames with a length prefix exceeding maxsize before allocating.
- Fix CheckMaxSize comment to reflect the actual 10MB limit.
- Update tests: restore "too large" test cases with 8MB strings
  that exceed the 10MB limit after base64 encoding.

Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
@rucoder rucoder force-pushed the rucoder/pubsub-msg-size branch 2 times, most recently from 30eaae6 to 707658f Compare March 6, 2026 17:49
Add optional per-topic FrameReader instrumentation: atomic counters
(frames, bytes, grows, max frame size) and a 9-bucket size histogram
per reader. A global registry maps topic names to stats entries.

StartStatsLogger writes a CSV to /persist/pubsub-frame-stats.csv at
a configurable interval; tools/plot-frame-stats.py visualises the
output with four panels (memory savings, per-topic buffer sizes,
reallocation pressure, frame size distribution).

Stats collection is disabled by default and controlled at runtime via
the debug.pubsub.stats.interval GlobalConfig key (value in seconds;
0 = off). zedbox subscribes to ConfigItemValueMap published by
zedagent and calls StartStatsLogger / stop when the key changes.

Use newFrameReaderInternal (no stats registration) for the publisher's
single-frame handshake read so it does not pollute the topic stats map.
Bucket counters use sync/atomic to avoid a data race between the reader
goroutine and the stats logger goroutine.

Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
@rucoder rucoder force-pushed the rucoder/pubsub-msg-size branch from 707658f to a295923 Compare March 7, 2026 13:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants