[Issue 387] fix goroutine leak for closing consumers.#808
[Issue 387] fix goroutine leak for closing consumers.#808crossoverJie wants to merge 7 commits intoapache:masterfrom
Conversation
pgier
left a comment
There was a problem hiding this comment.
Is there any chance that c.messageCh can be nil? Just wondering if we need a nil check around closing it since trying to close a nil channel can cause a panic.
Otherwise LGTM
Thanks for the reminder. pulsar-client-go/pulsar/consumer_impl.go Line 101 in 6a8e7f3 Initialization is done when the consumer is created. |
|
Looks like there is a failure in one of the tests that we're trying to close an already closed channel. I guess in some cases the channel gets closed and others it doesn't? |
|
@pgier The This is the reason why |
| wg.Wait() | ||
| close(c.closeCh) | ||
| closed := closeChanSet[c.messageCh] | ||
| if !closed { |
There was a problem hiding this comment.
closeChanSet is a global variable. The element in the set is never deleted. Would it be more reasonable to create a isClosed (bool) attribute in the consumer struct. It can be checked at line 569 and 570 as
if !c.isClosed {
close(c.messageCh)
}
I just do not understand why there is a need to use a global map to track the messageCh channel which will not be GCed. Isn't a leak?
There was a problem hiding this comment.
That's what I thought before, but in the case of multiTopicConsumer it causes the chan to be closed repeatedly.
This is because although the consumers are multiple instances, they use the same messageCh and can only be closed once.
So I'd like to record the closed flag for messageCh via a global variable.
With regard to the closeChanSet leak, we can set it to nil in the client.Close() method.
pulsar-client-go/pulsar/client_impl.go
Line 212 in 6a8e7f3
Is there a better way?
Gleiphir2769
left a comment
There was a problem hiding this comment.
Why not use Sync.Once()? Let closeMsgChOnce be a attribute in the consumer.
c.closeMsgChOnce.Do(func() {
close(c.messageCh)
})
multiTopicConsumer uses newInternalConsumer to creat. You can add a parameter closeMsgChOnce in newInternalConsumer, then the consumers also share the Sync.once().
pulsar-client-go/pulsar/consumer_impl.go
Lines 208 to 209 in 6a8e7f3
|
@Gleiphir2769 Good suggestion, thx. |
| close(c.messageCh) | ||
| closeChanSet[c.messageCh] = true | ||
| } | ||
| }) |
There was a problem hiding this comment.
Please remove these redundant code comments.
# Conflicts: # pulsar/consumer_regex.go

Fixes #387
Motivation
Fix goroutine leak for closing consumers.
Modifications
Close the
c.messageChat the same time as closing the consumer.Before fix: