Skip to content

Commit ea8ae87

Browse files
committed
Message tracing requires client publish permission for Nats-Trace-Dest
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 3fd43a0 commit ea8ae87

3 files changed

Lines changed: 237 additions & 8 deletions

File tree

server/client.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2739,8 +2739,8 @@ func (c *client) processHeaderPub(arg, remaining []byte) error {
27392739
// Do this only for CLIENT connections.
27402740
if c.kind == CLIENT && c.pa.hdr > 0 && len(remaining) > 0 {
27412741
hdr := remaining[:min(len(remaining), c.pa.hdr)]
2742-
if td := getHeader(MsgTraceDest, hdr); len(td) > 0 {
2743-
c.initAndSendIngressErrEvent(hdr, string(td), ErrMaxPayload)
2742+
if td, ok := c.allowedMsgTraceDest(hdr, false); ok && td != _EMPTY_ {
2743+
c.initAndSendIngressErrEvent(hdr, td, ErrMaxPayload)
27442744
}
27452745
}
27462746
c.maxPayloadViolation(c.pa.size, maxPayload)
@@ -3980,6 +3980,41 @@ func (c *client) pubAllowed(subject string) bool {
39803980
return c.pubAllowedFullCheck(subject, true, false)
39813981
}
39823982

3983+
// allowedMsgTraceDest returns the trace destination if present and authorized.
3984+
// It only considers static publish permissions and does not consume dynamic
3985+
// reply permissions because the client is not publishing the trace event itself.
3986+
func (c *client) allowedMsgTraceDest(hdr []byte, hasLock bool) (string, bool) {
3987+
if len(hdr) == 0 {
3988+
return _EMPTY_, true
3989+
}
3990+
td := sliceHeader(MsgTraceDest, hdr)
3991+
if len(td) == 0 {
3992+
return _EMPTY_, true
3993+
}
3994+
dest := bytesToString(td)
3995+
if c.kind == CLIENT {
3996+
if hasGWRoutedReplyPrefix(td) {
3997+
return dest, false
3998+
}
3999+
var acc *Account
4000+
var srv *Server
4001+
if !hasLock {
4002+
c.mu.Lock()
4003+
}
4004+
acc, srv = c.acc, c.srv
4005+
if !hasLock {
4006+
c.mu.Unlock()
4007+
}
4008+
if bytes.HasPrefix(td, clientNRGPrefix) && srv != nil && acc != srv.SystemAccount() {
4009+
return dest, false
4010+
}
4011+
}
4012+
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowedFullCheck(dest, false, hasLock) {
4013+
return dest, false
4014+
}
4015+
return dest, true
4016+
}
4017+
39834018
// pubAllowedFullCheck checks on all publish permissioning depending
39844019
// on the flag for dynamic reply permissions.
39854020
func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bool {
@@ -4115,10 +4150,19 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
41154150
genidAddr := &acc.sl.genid
41164151

41174152
// Check pub permissions
4118-
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowedFullCheck(string(c.pa.subject), true, true) {
4119-
c.mu.Unlock()
4120-
c.pubPermissionViolation(c.pa.subject)
4121-
return false, true
4153+
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) {
4154+
if !c.pubAllowedFullCheck(string(c.pa.subject), true, true) {
4155+
c.mu.Unlock()
4156+
c.pubPermissionViolation(c.pa.subject)
4157+
return false, true
4158+
}
4159+
}
4160+
if c.pa.hdr > 0 {
4161+
if td, ok := c.allowedMsgTraceDest(msg[:c.pa.hdr], true); !ok {
4162+
c.mu.Unlock()
4163+
c.pubPermissionViolation(stringToBytes(td))
4164+
return false, true
4165+
}
41224166
}
41234167
c.mu.Unlock()
41244168

server/msgtrace.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,13 @@ func (c *client) initMsgTrace() *msgTrace {
367367
}
368368
}
369369
dest = getHdrVal(MsgTraceDest)
370+
if c.kind == CLIENT {
371+
if td, ok := c.allowedMsgTraceDest(hdr, false); !ok {
372+
return nil
373+
} else if td != _EMPTY_ {
374+
dest = td
375+
}
376+
}
370377
// Check the destination to see if this is a valid public subject.
371378
if !IsValidPublishSubject(dest) {
372379
// We still have to return a msgTrace object (if traceOnly is set)

server/msgtrace_test.go

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,58 @@ func TestMsgTraceIngressMaxPayloadErrorDoesNotScanPayloadForTraceDest(t *testing
395395
}
396396
}
397397

398+
func TestMsgTraceIngressMaxPayloadErrorRequiresPublishPermissionForTraceDest(t *testing.T) {
399+
conf := createConfFile(t, []byte(`
400+
listen: 127.0.0.1:-1
401+
max_payload: 1024
402+
accounts {
403+
A {
404+
users: [
405+
{
406+
user: tracer
407+
password: pwd
408+
permissions {
409+
subscribe: ["my.trace.subj"]
410+
publish: ["my.trace.subj"]
411+
}
412+
},
413+
{
414+
user: pub
415+
password: pwd
416+
permissions {
417+
publish: ["foo"]
418+
}
419+
}
420+
]
421+
}
422+
}
423+
`))
424+
s, o := RunServerWithConfig(conf)
425+
defer s.Shutdown()
426+
427+
nct := natsConnect(t, s.ClientURL(), nats.UserInfo("tracer", "pwd"))
428+
defer nct.Close()
429+
traceSub := natsSubSync(t, nct, "my.trace.subj")
430+
natsFlush(t, nct)
431+
checkSubInterest(t, s, "A", "my.trace.subj", time.Second)
432+
433+
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", o.Port))
434+
require_NoError(t, err)
435+
defer c.Close()
436+
437+
_, err = c.Write([]byte("CONNECT {\"user\":\"pub\",\"pass\":\"pwd\",\"protocol\":1,\"headers\":true,\"no_responders\":true}\r\n"))
438+
require_NoError(t, err)
439+
440+
hdr := fmt.Sprintf("%s%s:%s\r\n\r\n", hdrLine, MsgTraceDest, traceSub.Subject)
441+
hPub := fmt.Sprintf("HPUB foo %d 2048\r\n%sAAAAAAAAAAAAAAAAAA...", len(hdr), hdr)
442+
_, err = c.Write([]byte(hPub))
443+
require_NoError(t, err)
444+
445+
if traceMsg, err := traceSub.NextMsg(250 * time.Millisecond); err == nil {
446+
t.Fatalf("Should not have received trace message: %s", traceMsg.Data)
447+
}
448+
}
449+
398450
func TestMsgTraceIngressErrors(t *testing.T) {
399451
conf := createConfFile(t, []byte(`
400452
port: -1
@@ -407,7 +459,7 @@ func TestMsgTraceIngressErrors(t *testing.T) {
407459
permissions {
408460
subscribe: ["my.trace.subj", "foo"]
409461
publish {
410-
allow: ["foo", "bar.>"]
462+
allow: ["foo", "bar.>", "my.trace.subj"]
411463
deny: ["bar.baz"]
412464
}
413465
}
@@ -487,7 +539,7 @@ func TestMsgTraceEgressErrors(t *testing.T) {
487539
deny: "bar.bat"
488540
}
489541
publish {
490-
allow: ["foo", "bar.>"]
542+
allow: ["foo", "bar.>", "my.trace.subj"]
491543
deny: ["bar.baz"]
492544
}
493545
}
@@ -633,6 +685,132 @@ func TestMsgTraceEgressErrors(t *testing.T) {
633685
}
634686
}
635687

688+
func TestMsgTraceIngressRequiresPublishPermissionForTraceDest(t *testing.T) {
689+
conf := createConfFile(t, []byte(`
690+
port: -1
691+
accounts {
692+
A {
693+
users: [
694+
{
695+
user: tracer
696+
password: pwd
697+
permissions {
698+
subscribe: ["my.trace.subj", "foo"]
699+
publish: ["my.trace.subj"]
700+
}
701+
},
702+
{
703+
user: pub
704+
password: pwd
705+
permissions {
706+
publish: ["foo"]
707+
}
708+
}
709+
]
710+
}
711+
}
712+
`))
713+
s, _ := RunServerWithConfig(conf)
714+
defer s.Shutdown()
715+
716+
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("tracer", "pwd"))
717+
defer nc.Close()
718+
719+
tsub := natsSubSync(t, nc, "my.trace.subj")
720+
asub := natsSubSync(t, nc, "foo")
721+
natsFlush(t, nc)
722+
723+
ncp, err := nats.Connect(
724+
s.ClientURL(),
725+
nats.UserInfo("pub", "pwd"),
726+
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {}),
727+
)
728+
require_NoError(t, err)
729+
defer ncp.Close()
730+
731+
msg := nats.NewMsg("foo")
732+
msg.Header.Set(MsgTraceDest, tsub.Subject)
733+
msg.Data = []byte("hello")
734+
735+
require_NoError(t, ncp.PublishMsg(msg))
736+
natsFlush(t, ncp)
737+
738+
err = ncp.LastError()
739+
require_Error(t, err)
740+
require_Contains(t, err.Error(), fmt.Sprintf("Permissions Violation for Publish to %q", tsub.Subject))
741+
742+
if m, err := asub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
743+
t.Fatalf("Did not expect application message, got %v / %v", m, err)
744+
}
745+
if tm, err := tsub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
746+
t.Fatalf("Did not expect trace message, got %v / %v", tm, err)
747+
}
748+
}
749+
750+
func TestMsgTraceIngressRejectsReservedTraceDest(t *testing.T) {
751+
conf := createConfFile(t, []byte(`
752+
port: -1
753+
accounts {
754+
A {
755+
users: [
756+
{
757+
user: sub
758+
password: pwd
759+
permissions {
760+
subscribe: ["foo"]
761+
}
762+
},
763+
{
764+
user: pub
765+
password: pwd
766+
permissions {
767+
publish: [">"]
768+
}
769+
},
770+
{
771+
user: pub2
772+
password: pwd
773+
}
774+
]
775+
}
776+
}
777+
`))
778+
s, _ := RunServerWithConfig(conf)
779+
defer s.Shutdown()
780+
781+
ncs := natsConnect(t, s.ClientURL(), nats.UserInfo("sub", "pwd"))
782+
defer ncs.Close()
783+
asub := natsSubSync(t, ncs, "foo")
784+
natsFlush(t, ncs)
785+
786+
for _, user := range []string{"pub", "pub2"} {
787+
ncp, err := nats.Connect(
788+
s.ClientURL(),
789+
nats.UserInfo(user, "pwd"),
790+
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {}),
791+
)
792+
require_NoError(t, err)
793+
defer ncp.Close()
794+
795+
for _, traceDest := range []string{gwReplyPrefix + "trace", "$NRG.trace"} {
796+
msg := nats.NewMsg("foo")
797+
msg.Header.Set(MsgTraceDest, traceDest)
798+
msg.Data = []byte("hello")
799+
800+
require_NoError(t, ncp.PublishMsg(msg))
801+
natsFlush(t, ncp)
802+
803+
err = ncp.LastError()
804+
require_Error(t, err)
805+
require_Contains(t, err.Error(), fmt.Sprintf("Permissions Violation for Publish to %q", traceDest))
806+
807+
if m, err := asub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
808+
t.Fatalf("Did not expect application message, got %v / %v", m, err)
809+
}
810+
}
811+
}
812+
}
813+
636814
func TestMsgTraceWithQueueSub(t *testing.T) {
637815
o := DefaultOptions()
638816
s := RunServer(o)

0 commit comments

Comments
 (0)