pubsub: switch from unixpacket to framed stream sockets#5644
pubsub: switch from unixpacket to framed stream sockets#5644rucoder wants to merge 3 commits intolf-edge:masterfrom
Conversation
fd55ae3 to
9ace3e8
Compare
|
@eriknordmark , @milan-zededa , pls, let's take a look closer to this PR since it touches a critical pubsub infrastructure.... |
|
@rene @eriknordmark @michel-zededa I did not touch "pubsub-large" yet. this is experimental PR. I'm 99% sure it works but let's test |
There was a problem hiding this comment.
Pull request overview
This PR updates pubsub’s IPC transport to remove the historical ~65KB message size ceiling by switching from unixpacket sockets to unix stream sockets and introducing length-prefixed framing using the vendored getlantern/framed library with big frames enabled.
Changes:
- Switch pubsub socket transport from
unixpackettounixstream sockets. - Wrap IPC reads/writes with
framed.Reader/framed.WriterusingEnableBigFrames(). - Increase the socketdriver
maxsizelimit to 10MB and update related tests/comments.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/pillar/pubsub/socketdriver/subscribe.go | Subscriber now dials unix and reads framed messages. |
| pkg/pillar/pubsub/socketdriver/publish.go | Publisher now accepts unix stream conns, reads framed requests, and writes framed updates. |
| pkg/pillar/pubsub/socketdriver/driver.go | Raises maxsize to 10MB; adds framed reader/writer helpers and removes buffer pool. |
| pkg/pillar/pubsub/reverse/subscribe.go | Reverse subscriber now listens on unix and reads framed messages. |
| pkg/pillar/pubsub/reverse/publish.go | Reverse publisher now dials unix and writes framed messages. |
| pkg/pillar/pubsub/checkmaxsize_test.go | Updates test commentary/cases to reflect removal of the old ~65KB boundary behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| s.log.Tracef("sendUpdate(%s): key %s\n", s.name, key) | ||
| // base64-encode to avoid having spaces in the key and val | ||
| sendKey := base64.StdEncoding.EncodeToString([]byte(key)) | ||
| sendVal := base64.StdEncoding.EncodeToString(val) | ||
| buf := fmt.Sprintf("update %s %s %s", s.topic, sendKey, sendVal) | ||
| if len(buf) >= maxsize { | ||
| s.log.Fatalf("Too large message (%d bytes) sent to %s topic %s key %s", | ||
| len(buf), s.name, s.topic, key) | ||
| } | ||
| _, err := sock.Write([]byte(buf)) | ||
| _, err := writer.Write([]byte(buf)) | ||
| return err |
There was a problem hiding this comment.
These send* methods no longer enforce the driver maxsize limit. Since PublicationImpl.Publish() does not call CheckMaxSize(), oversized values can now be sent over IPC, bypassing the intended 10MB cap and potentially causing large allocations/pressure on subscribers. Reintroduce a size check (ideally returning an error rather than Fatal) before writer.Write(), or enforce maxsize at publish/serialize time for all outgoing messages.
| // CheckMaxSize returns an error if too large. | ||
| // With framed big frames the limit is ~4GB, so this is unlikely to trigger. |
There was a problem hiding this comment.
The comment on CheckMaxSize is misleading: the framed transport may support ~4GB frames, but this driver still enforces maxsize (currently 10MB) and this function can definitely trigger. Please update the comment to reflect the actual enforced limit and its purpose (protecting memory/IPC).
| // CheckMaxSize returns an error if too large. | |
| // With framed big frames the limit is ~4GB, so this is unlikely to trigger. | |
| // CheckMaxSize returns an error if the serialized message exceeds maxsize. | |
| // Even though the framed transport can support ~4GB frames, this driver enforces | |
| // a much smaller application-level limit (currently 10MB) to protect memory/IPC. |
| // NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames | ||
| // (32-bit length prefix), allowing messages up to ~4GB. | ||
| func NewFramedReader(conn net.Conn) *framed.Reader { | ||
| r := framed.NewReader(conn) | ||
| r.EnableBigFrames() | ||
| return r | ||
| } | ||
|
|
There was a problem hiding this comment.
NewFramedReader/NewFramedWriter enable big frames (~4GB), but the framed.Reader.ReadFrame() API allocates based on the length prefix and does not enforce a maximum on reads (only writes check maxFrameLength). Given this package defines maxsize (10MB), consider centralizing a bounded read helper in socketdriver (e.g., read length, reject > maxsize, then read) so all IPC consumers are protected from oversized/malicious frames.
| // NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames | |
| // (32-bit length prefix), allowing messages up to ~4GB. | |
| func NewFramedReader(conn net.Conn) *framed.Reader { | |
| r := framed.NewReader(conn) | |
| r.EnableBigFrames() | |
| return r | |
| } | |
| // NewFramedReader wraps a net.Conn with a framed.Reader. | |
| // Big frames are not enabled to avoid unbounded allocations on reads. | |
| func NewFramedReader(conn net.Conn) *framed.Reader { | |
| r := framed.NewReader(conn) | |
| return r | |
| } |
| reader := socketdriver.NewFramedReader(conn) | ||
|
|
||
| for { | ||
| // wait for readable conn | ||
| if err := pubsub.ConnReadCheck(conn); err != nil { | ||
| if err != io.EOF { | ||
| log.Errorf("serveConnection: Error on connReadCheck: %s", | ||
| err) | ||
| } | ||
| break | ||
| } | ||
|
|
||
| buf, doneFunc := socketdriver.GetBuffer() | ||
| defer doneFunc() | ||
|
|
||
| count, err := conn.Read(buf) | ||
| frame, err := reader.ReadFrame() | ||
| if err != nil { | ||
| if err != io.EOF { | ||
| log.Errorf("serveConnection: Error on read: %s", | ||
| err) | ||
| } | ||
| break | ||
| } | ||
| retChan <- string(buf[:count]) | ||
| retChan <- string(frame) |
There was a problem hiding this comment.
serveConnection uses framed.Reader.ReadFrame(), which can allocate arbitrarily large buffers based on the peer-provided length prefix (big frames are enabled). Since this listener accepts local connections, a buggy/malicious publisher can DoS the subscriber via OOM. Use a bounded read (enforce socketdriver maxsize) before allocating/reading the payload.
| // With framed big frames (32-bit length prefix) the IPC message size | ||
| // limit is ~4GB, so the old 65KB boundary tests no longer apply. | ||
| // The "large tag" mechanism (pubsub-large-*) still has its own 1MB | ||
| // limit enforced by large.go, so those cases remain. |
There was a problem hiding this comment.
TestCheckMaxSize no longer exercises the driver maxsize boundary after increasing it to 10MB. To avoid regressions, add at least one case that exceeds the new maxsize (e.g., ~11–12MB payload after base64 overhead) and assert that CheckMaxSize returns an error.
| frame, err := s.reader.ReadFrame() | ||
| if err != nil { | ||
| errStr := fmt.Sprintf("connectAndRead(%s): sock read failed %s", | ||
| s.name, err) |
There was a problem hiding this comment.
Using framed.Reader.ReadFrame() is unsafe here because the vendored framed implementation allocates a buffer based solely on the on-wire length prefix and does not enforce any maximum. With big frames enabled, a malformed or malicious peer can trigger very large allocations (OOM) before any pubsub-level size checks run. Prefer reading the length prefix, rejecting frames > maxsize (10MB), and then reading into a bounded/reusable buffer (e.g., use framed.Reader.Read() with a preallocated maxsize buffer or implement a capped ReadFrame helper).
| // Read request | ||
| res, err := conn.Read(buf) | ||
| frame, err := reader.ReadFrame() | ||
| if err != nil { | ||
| // Peer process could have died | ||
| s.log.Errorf("serveConnection(%s/%d) error: %v", s.name, instance, err) | ||
| return |
There was a problem hiding this comment.
serveConnection reads the initial request using framed.Reader.ReadFrame(), which allocates according to the peer-provided length prefix with no upper bound. Even though the request should be small, a bad client can force large allocations and DoS the publisher. Consider reading the request with a fixed-size buffer (since the request is known to be small) or adding a max-frame-size check before allocating/reading the full payload.
eriknordmark
left a comment
There was a problem hiding this comment.
According to https://github.com/getlantern/framed/blob/master/framed.go the max length is 65535 so this doesn't change anything but adding silent message truncation for larger messages,
If framed.go did support 4GByte size messages my question would be what the max memory usage would be if we started having pubsub use messages that are in the size of Megabytes. I see you removed the buffer pool which means that memory usage would increase even if the we stay below 65536, which already is a significant concern; it takes a while for the GC to kick in.
So I don't understand what problem we need to fix here - replacing teaching folks about a max size and how to use pubsub 'large' with additional OOM risk doesn't seem like an improvement to me.
9ace3e8 to
ce6c077
Compare
no, we use
when we use pubsub large we load message to RAM one way or another, you are right, Framed doesn't increase the size but send a single message as fragments then assemble them back to full packet. the idea was to handle messages that are not annotated as "pubsub-large" but suddenly become >64k in side. This exactly what we had recently if 20 SR-IOV VF were enabled @eriknordmark since pbsub-large is still in places, only messages that accidentally exceed the sized will benefit. To avoid the concern about GC pressure I'll revert to a buffer allocated on client side. Basically the same behavior for messages <=64k and grow buffer if necessary for bigger messages |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // We add new test cases that exceed the 10MB limit to verify it. | ||
| // The "large tag" mechanism (pubsub-large-*) still has its own 1MB | ||
| // limit enforced by large.go, so those cases remain. |
There was a problem hiding this comment.
The comment says the new test cases “exceed the 10MB limit”, but these cases use 8 * 1024 * 1024. If the intent is to exceed the 10MB maxsize after base64/message overhead, please call that out explicitly (otherwise consider using a size that is unambiguously over the limit).
| // We add new test cases that exceed the 10MB limit to verify it. | |
| // The "large tag" mechanism (pubsub-large-*) still has its own 1MB | |
| // limit enforced by large.go, so those cases remain. | |
| // We add new test cases with 8MiB payloads; while the raw payload size | |
| // is below 10MB, base64 encoding and message framing overhead push the | |
| // resulting IPC message size over the effective 10MB maxsize we verify. |
| // NewFramedReader wraps a net.Conn with a framed.Reader that uses big frames | ||
| // (32-bit length prefix). Callers must use readFrame() instead of calling | ||
| // ReadFrame() directly, to enforce the maxsize limit and prevent unbounded | ||
| // memory allocation from a buggy peer. | ||
| func NewFramedReader(conn net.Conn) *framed.Reader { | ||
| r := framed.NewReader(conn) | ||
| r.EnableBigFrames() | ||
| return r | ||
| } | ||
|
|
||
| // logs a message if the allocation changed | ||
| var lastLoggedAllocated uint32 | ||
|
|
||
| func maybeLogAllocated(log *base.LogObject) { | ||
| currentAllocated := atomic.LoadUint32(&allocated) | ||
| currentLastAllocated := atomic.LoadUint32(&lastLoggedAllocated) | ||
| if currentLastAllocated == currentAllocated { | ||
| return | ||
| // ReadFrame reads a single frame from the framed.Reader and rejects frames | ||
| // that exceed maxsize. This protects against unbounded memory allocation | ||
| // from a buggy or malicious peer sending a large length prefix. | ||
| func ReadFrame(r *framed.Reader) ([]byte, error) { | ||
| frame, err := r.ReadFrame() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if len(frame) > maxsize { | ||
| return nil, fmt.Errorf("received frame of %d bytes exceeds max %d", | ||
| len(frame), maxsize) | ||
| } |
There was a problem hiding this comment.
ReadFrame() currently calls r.ReadFrame() and only then checks len(frame) > maxsize. In getlantern/framed, Reader.ReadFrame() allocates make([]byte, n) based on the length prefix before returning, so this code does not prevent unbounded memory allocation / OOM from a large length prefix. To actually enforce maxsize, read and validate the length prefix first (before allocating) or use a bounded read strategy and close/drain the connection on oversize frames. Also, the doc comment above NewFramedReader refers to readFrame() but the exported helper is ReadFrame().
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #5644 +/- ##
==========================================
+ Coverage 19.52% 29.49% +9.96%
==========================================
Files 19 18 -1
Lines 3021 2417 -604
==========================================
+ Hits 590 713 +123
+ Misses 2310 1552 -758
- Partials 121 152 +31 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@eriknordmark @rene it would be interesting to kick eden tests to see how it works |
Actually we don't know how many copies of that 10MByte we will end up in memory when sending and receiving it.
I think it would be a bad idea to replace the annoying but quickly detected log.Fatal when trying a new configuration (such as the above more VF case) with an increased risk that devices will hit an OOM in production based on unpredictable load conditions. |
Replace raw unixpacket sockets with unix stream sockets using the getlantern/framed library with 32-bit length-prefixed framing. This removes the 65KB (16-bit) message size limitation that has been a long-standing issue, raising the effective limit to 10MB. Changes: - socketdriver: switch unixpacket to unix stream sockets - socketdriver: use framed.Reader/Writer with EnableBigFrames() - socketdriver: raise maxsize from 65535 to 10MB - socketdriver: remove buffer pool (GetBuffer/bufPool) and allocation tracking, no longer needed with framed - reverse pubsub: same treatment (unix + framed) - Remove ConnReadCheck usage (framed ReadFrame blocks properly) - Update CheckMaxSize tests for new limit Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
08dc2be to
3ddf8a5
Compare
c6ea46e to
29d0798
Compare
29d0798 to
9a379a1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
|
|
||
| with open(path, newline="", encoding="utf-8") as csvfile: | ||
| reader = csv.DictReader(csvfile) |
There was a problem hiding this comment.
parse_csv assumes the CSV has a header row (reader.fieldnames is non-nil). If the file exists but is empty/partial (e.g. stats enabled briefly), this will raise an exception before you can print a helpful error. Consider checking reader.fieldnames for nil/empty and returning a user-friendly message.
| reader = csv.DictReader(csvfile) | |
| reader = csv.DictReader(csvfile) | |
| # Handle empty or malformed CSV files without a header row. | |
| if not reader.fieldnames: | |
| print( | |
| f"Error: CSV file '{path}' is empty or missing a header row; nothing to plot.", | |
| file=sys.stderr, | |
| ) | |
| return {}, [] |
| "File too large": { | ||
| agentName: "", | ||
| // agentScope: "testscope1", | ||
| stringSize: 49122, | ||
| stringSize: 8 * 1024 * 1024, | ||
| expectFail: true, | ||
| }, | ||
| "IPC too large": { | ||
| agentName: "testagent1", | ||
| // agentScope: "testscope", | ||
| stringSize: 49122, | ||
| stringSize: 8 * 1024 * 1024, | ||
| expectFail: true, | ||
| }, | ||
| "IPC with persistent too large": { | ||
| agentName: "testagent2", | ||
| // agentScope: "testscope", | ||
| persistent: true, | ||
| stringSize: 49122, | ||
| stringSize: 8 * 1024 * 1024, | ||
| expectFail: true, |
There was a problem hiding this comment.
The new oversize test inputs use 8MB strings to trigger the 10MB framed limit. Note that the test later allocates and fills these buffers byte-by-byte, which can add noticeable runtime/CPU to unit tests. Consider either lowering the oversize payload while still deterministically exceeding maxsize after JSON+base64 overhead, or constructing the data more efficiently to keep tests fast.
| var ( | ||
| globalStatsMu sync.Mutex | ||
| globalReaderStats = make(map[string]*readerStats) // key is topic name | ||
| ) | ||
|
|
||
| // registerReader registers a FrameReader's stats under the given topic. | ||
| func registerReader(topic string, stats *readerStats) { | ||
| globalStatsMu.Lock() | ||
| defer globalStatsMu.Unlock() | ||
| globalReaderStats[topic] = stats |
There was a problem hiding this comment.
The globalReaderStats registry is keyed only by topic name, but a single process can have multiple subscriptions/readers for the same topic (e.g. nim subscribes to types.DevicePortConfig from multiple agents). This causes stats entries to be overwritten and Close() from one reader to delete stats for the others. Use a unique key (e.g. publisher agent+topic, socket path, or a generated reader ID) and keep the human-readable topic as a separate field in the exported snapshot.
| func StartStatsLogger(log *base.LogObject, interval time.Duration, csvPath string) func() { | ||
| done := make(chan struct{}) | ||
| go func() { | ||
| ticker := time.NewTicker(interval) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if err := writeStatsCSV(csvPath); err != nil { | ||
| log.Errorf("FrameReader stats CSV: %v", err) | ||
| } | ||
| LogReaderStats(log) | ||
| case <-done: | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| return func() { close(done) } |
There was a problem hiding this comment.
StartStatsLogger calls time.NewTicker(interval) without validating interval; time.NewTicker panics for <=0 durations. Since this is an exported helper, guard against non-positive intervals (e.g., return a no-op stop func, or treat <=0 as disabled) to avoid accidental panics from callers.
| } | ||
| s.sock = sock | ||
| s.writer = NewFramedWriter(sock) | ||
| s.reader = NewFrameReader(sock, s.topic) |
There was a problem hiding this comment.
FrameReader is registered under s.topic only. Because the same process can subscribe to the same TopicImpl from different publishers (same topic name but different AgentName), this will collide in the stats registry and make start/stop/unregister behave incorrectly for instrumentation. Consider passing a unique identifier to NewFrameReader (e.g. s.name+"/"+s.topic or s.sockName) and keeping the topic separately for grouping in reports.
| s.reader = NewFrameReader(sock, s.topic) | |
| s.reader = NewFrameReader(sock, s.name+"/"+s.topic) |
tools/plot_frame_stats.py
Outdated
| sys.exit(1) | ||
|
|
||
| if output: | ||
| matplotlib.use("Agg") |
There was a problem hiding this comment.
matplotlib backend is switched to "Agg" after pyplot has already been imported at module import time. matplotlib requires selecting the backend before importing matplotlib.pyplot; otherwise -o may not work reliably (and can emit warnings/errors). Consider moving the pyplot import inside plot() after setting the backend, or use plt.switch_backend('Agg') when output is requested.
| matplotlib.use("Agg") | |
| plt.switch_backend("Agg") |
tools/plot_frame_stats.py
Outdated
| import numpy as np | ||
| _HAVE_MATPLOTLIB = True | ||
| except ImportError: | ||
| _HAVE_MATPLOTLIB = False | ||
|
|
||
|
|
There was a problem hiding this comment.
The ImportError handler treats any missing dependency as "matplotlib is required", but this block also imports numpy. If numpy is missing, the error message will be misleading. Consider adjusting the message to mention both (or separating the imports/except blocks).
| import numpy as np | |
| _HAVE_MATPLOTLIB = True | |
| except ImportError: | |
| _HAVE_MATPLOTLIB = False | |
| _HAVE_MATPLOTLIB = True | |
| except ImportError: | |
| _HAVE_MATPLOTLIB = False | |
| try: | |
| import numpy as np | |
| except ImportError: | |
| # numpy is also required for plotting; treat its absence the same | |
| # way as a missing matplotlib installation. | |
| _HAVE_MATPLOTLIB = False |
| // writeStatsCSV appends one row per topic plus a "__totals__" row to the given | ||
| // CSV file. The file is created with a header on first call. | ||
| // CSV columns: | ||
| // | ||
| // timestamp, topic, frames, bytes, grows, max_frame, buf_size, | ||
| // old_pool_mem, arena_mem, saved_bytes, arena_allocs, | ||
| // bucket_256B, bucket_1KB, bucket_4KB, bucket_16KB, | ||
| // bucket_64KB, bucket_256KB, bucket_1MB, bucket_10MB, bucket_over | ||
| func writeStatsCSV(filePath string) error { | ||
| entries := GetAllReaderStats() | ||
| if len(entries) == 0 { | ||
| return nil | ||
| } | ||
| sort.Slice(entries, func(i, j int) bool { | ||
| return entries[i].Topic < entries[j].Topic | ||
| }) | ||
|
|
||
| needHeader := false | ||
| if _, err := os.Stat(filePath); os.IsNotExist(err) { | ||
| needHeader = true | ||
| } | ||
|
|
||
| f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) | ||
| if err != nil { | ||
| return fmt.Errorf("writeStatsCSV: open %s: %w", filePath, err) | ||
| } | ||
| defer f.Close() | ||
|
|
||
| w := csv.NewWriter(f) | ||
| defer w.Flush() | ||
|
|
||
| if needHeader { | ||
| header := []string{ | ||
| "timestamp", "topic", "frames", "bytes", "grows", | ||
| "max_frame", "buf_size", "old_pool_mem", "arena_mem", | ||
| "saved_bytes", "arena_allocs", | ||
| } | ||
| for _, label := range frameSizeBucketLabels { | ||
| header = append(header, "bucket_"+label) | ||
| } |
There was a problem hiding this comment.
The CSV header labels written here are derived from frameSizeBucketLabels (e.g. "bucket_<=256B"), but the comment above still documents different column names (bucket_256B, bucket_over, etc.). Please update the comment to match the actual output to avoid breaking parsers/tools that rely on the documented schema.
| configItemSpecMap.AddStringItem(AppBootOrder, "", validateBootOrder) | ||
| configItemSpecMap.AddStringItem(TUIMonitorLogLevel, "info", blankValidator) | ||
| configItemSpecMap.AddStringItem(EdgeviewPublicKeys, "", blankValidator) | ||
| configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run |
There was a problem hiding this comment.
This sets PubsubStatsInterval default to 1s, which enables the stats logger by default and will continuously log + append to /persist on all devices. The PR description indicates this should be off by default; please change the default back to 0 (disabled) before merge, leaving the feature controllable via the controller key.
|
@eriknordmark I think I introduced a lot of confusing by mentioning SR-IOV, I meant the issue that was recently reported after SRIOV fixes were merged -- some pubsub struct grew beyond 65k and it was not marked as pubsub-large. Anyways, please have a look at updated PR. I added arena allocator and statistics collection. I still need to test on a real device with apps + many networks etc. |
9a379a1 to
dae3b3a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """ | ||
| Plot FrameReader statistics from CSV produced by socketdriver.StartStatsLogger. | ||
|
|
||
| Usage: | ||
| # Copy CSV from device: | ||
| scp device:/persist/pubsub-frame-stats.csv . | ||
|
|
||
| # Plot all graphs: | ||
| python3 plot_frame_stats.py pubsub-frame-stats.csv | ||
|
|
There was a problem hiding this comment.
The PR description/testing instructions refer to a plot script under pkg/pillar/pubsub/socketdriver/plot-frame-stats.py, but this PR adds tools/plot_frame_stats.py (different path/name). Please align the documentation/instructions (and any packaging/install expectations) so users can find and run the script as documented.
| // Read 4-byte length header | ||
| if _, err := io.ReadFull(fr.r, fr.hdr[:]); err != nil { | ||
| return nil, err | ||
| } | ||
| n := int(binary.LittleEndian.Uint32(fr.hdr[:])) | ||
| if n > maxsize { | ||
| return nil, fmt.Errorf("received frame of %d bytes exceeds max %d", | ||
| n, maxsize) | ||
| } |
There was a problem hiding this comment.
FrameReader.ReadFrame converts the 32-bit length prefix to int before validating. On 32-bit platforms, a length > math.MaxInt32 will wrap to a negative int, bypass the maxsize check, and then panic when slicing fr.buf[:n]. Validate the length as a uint32 (and ensure it fits into int) before converting, and explicitly reject negative/overflow cases.
| configItemSpecMap.AddStringItem(AppBootOrder, "", validateBootOrder) | ||
| configItemSpecMap.AddStringItem(TUIMonitorLogLevel, "info", blankValidator) | ||
| configItemSpecMap.AddStringItem(EdgeviewPublicKeys, "", blankValidator) | ||
| configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run |
There was a problem hiding this comment.
PubsubStatsInterval is added as a debug knob, but the default is currently set to 1s (commented as temporary). This will enable periodic stats logging by default on all devices, causing unnecessary CPU/disk writes to /persist. Please set the default back to 0 (disabled) before merge, and keep profiling defaults out of the main branch (or gate behind a build/debug flag).
| configItemSpecMap.AddIntItem(PubsubStatsInterval, 1, 0, 3600) // tmp: default 1s for profiling run | |
| configItemSpecMap.AddIntItem(PubsubStatsInterval, 0, 0, 3600) // default 0s (disabled) to avoid unnecessary profiling |
|
|
||
| // StartStatsLogger starts a goroutine that periodically writes FrameReader | ||
| // statistics to a CSV file and optionally also logs them. | ||
| // csvPath is the output CSV file (e.g. "/persist/pubsub-stats.csv"). |
There was a problem hiding this comment.
StartStatsLogger doc comment mentions csvPath example "/persist/pubsub-stats.csv", but the rest of this PR (zedbox statsCSVPath and plotter) uses "/persist/pubsub-frame-stats.csv". Please align the comment/example (or the filename) to avoid confusion when enabling/consuming stats.
| // csvPath is the output CSV file (e.g. "/persist/pubsub-stats.csv"). | |
| // csvPath is the output CSV file (e.g. "/persist/pubsub-frame-stats.csv"). |
| reader := socketdriver.NewFrameReader(conn, name) | ||
| defer reader.Close() |
There was a problem hiding this comment.
serveConnection registers FrameReader stats using the socket path (sockName) as the key. Since startSubscriber accepts connections in a loop and spawns a goroutine per connection, multiple concurrent publishers will overwrite each other in the global stats map and unregistering one connection can delete stats still in use by another. Consider using a unique per-connection key (e.g., include an incrementing instance or conn.RemoteAddr()), or use a stats-free reader here if reverse pubsub stats aren’t needed.
Address review feedback: - Add FrameReader with a reusable buffer that starts at 1KB and grows with 25% headroom as needed (up to maxsize). This avoids per-read allocations from framed.ReadFrame() while using less memory than the old fixed 65KB buffer pool for small-message topics. - Enforce maxsize (10MB) on both send and receive paths: send methods return an error if the message exceeds maxsize, FrameReader rejects frames with a length prefix exceeding maxsize before allocating. - Fix CheckMaxSize comment to reflect the actual 10MB limit. - Update tests: restore "too large" test cases with 8MB strings that exceed the 10MB limit after base64 encoding. Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
30eaae6 to
707658f
Compare
Add optional per-topic FrameReader instrumentation: atomic counters (frames, bytes, grows, max frame size) and a 9-bucket size histogram per reader. A global registry maps topic names to stats entries. StartStatsLogger writes a CSV to /persist/pubsub-frame-stats.csv at a configurable interval; tools/plot-frame-stats.py visualises the output with four panels (memory savings, per-topic buffer sizes, reallocation pressure, frame size distribution). Stats collection is disabled by default and controlled at runtime via the debug.pubsub.stats.interval GlobalConfig key (value in seconds; 0 = off). zedbox subscribes to ConfigItemValueMap published by zedagent and calls StartStatsLogger / stop when the key changes. Use newFrameReaderInternal (no stats registration) for the publisher's single-frame handshake read so it does not pollute the topic stats map. Bucket counters use sync/atomic to avoid a data race between the reader goroutine and the stats logger goroutine. Signed-off-by: Mikhail Malyshev <mike.malyshev@gmail.com>
707658f to
a295923
Compare
pubsub: switch from unixpacket to framed stream sockets
Summary
Replaces raw
unixpacket(SOCK_SEQPACKET) sockets withunix(SOCK_STREAM) sockets using thegetlantern/framedlibrary with 4-byte little-endian length-prefix framing (EnableBigFrames()). This removes the hard OS-level 65 KB per-message limit and replaces it with a configurable 10 MB application-level limit.This is three commits:
debug.pubsub.stats.intervalcontroller key)What Changed
Commit 1: Wire protocol
unixpacket→unixstream sockets in socketdriver and reverse pubsubframed.Reader/framed.Writer+EnableBigFrames()(4-byte length prefix, little-endian)maxsizefrom 65535 to 10 MB (applies after base64 encoding, so effective raw JSON limit ≈ 7.5 MB)ConnReadCheck—framed.ReadFrame()blocks correctly on stream socketsCommit 2: FrameReader arena + maxsize
FrameReader: per-reader buffer starting at 1 KB, growing 25% on demand, never shrinkingsync.Poolof 65 KB fixed buffers → ~95% memory reduction (measured: 63 KB arena vs 1.3 MB old pool for 21 active topics)maxsizeconsistently on both send (> maxsize) and receive (> maxsize)CheckMaxSizehelper for callers to validate before publishingCommit 3: Stats instrumentation via GlobalConfig
FrameReaderatomic counters: frames, bytes, grows, max frame size, 9-bucket size histogramGetAllReaderStats()StartStatsLogger) + matplotlib visualizer (plot-frame-stats.py)debug.pubsub.stats.interval(seconds; 0 = off)newFrameReaderInternal(no stats) used for publisher's single handshake read to avoid stats key collisionsync/atomicto avoid data race between reader and stats goroutinesStats Run Results (4-minute boot on QEMU)
Key findings:
ConfigItemValueMap(24.4 KB, 1 grow),DeviceNetworkStatus(7.0 KB, 2 grows),DevicePortConfigList(2.7 KB, 1 grow)How to Test
Changelog Notes
Increased pubsub IPC message size limit from 65 KB to 10 MB by switching from raw
unixpacketdatagram sockets to length-prefixed stream sockets. Memory usage reduced ~95% via per-reader arena buffers. Optional per-topic stats collection available viadebug.pubsub.stats.intervalcontroller key.PR Backports
Checklist
I've provided a proper description
I've added the proper documentation
I've tested my PR on amd64 device (QEMU + onboarding in progress)
I've tested my PR on arm64 device
I've written the test verification instructions
I've set the proper labels to this PR
I've checked the boxes above, or I've provided a good reason why I didn't check them.