Skip to content

Commit 137b9c0

Browse files
authored
Merge pull request #3415 from zach593/pq-buffer
✨ Use a buffer to optimize priority queue AddWithOpts performance
2 parents 5de4c4f + c47f9cb commit 137b9c0

2 files changed

Lines changed: 92 additions & 14 deletions

File tree

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ type Opts[T comparable] struct {
5454
// Opt allows to configure a PriorityQueue.
5555
type Opt[T comparable] func(*Opts[T])
5656

57+
type bufferItem[T comparable] struct {
58+
opts AddOpts
59+
items []T
60+
}
61+
5762
// New constructs a new PriorityQueue.
5863
func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5964
opts := &Opts[T]{}
@@ -70,11 +75,12 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7075
}
7176

7277
pq := &priorityqueue[T]{
73-
log: opts.Log,
74-
items: map[T]*item[T]{},
75-
ready: btree.NewG(32, lessReady[T]),
76-
waiting: btree.NewG(32, lessWaiting[T]),
77-
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
78+
log: opts.Log,
79+
itemAddedToAddBuffer: make(chan struct{}, 1),
80+
items: map[T]*item[T]{},
81+
ready: btree.NewG(32, lessReady[T]),
82+
waiting: btree.NewG(32, lessWaiting[T]),
83+
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
7884
// readyItemOrWaiterAdded indicates that a ready item or
7985
// waiter was added. It must be buffered, because
8086
// if we currently process items we can't tell
@@ -89,6 +95,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8995
tick: time.Tick,
9096
}
9197

98+
go pq.handleAddBuffer()
9299
go pq.handleReadyItems()
93100
go pq.handleWaitingItems()
94101
go pq.logState()
@@ -101,6 +108,11 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
101108

102109
type priorityqueue[T comparable] struct {
103110
log logr.Logger
111+
112+
addBufferLock sync.Mutex
113+
addBuffer []bufferItem[T]
114+
itemAddedToAddBuffer chan struct{}
115+
104116
// lock has to be acquired for any access to any of items, ready, waiting,
105117
// addedCounter or waiters.
106118
lock sync.Mutex
@@ -143,12 +155,53 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
143155
return
144156
}
145157

158+
if len(items) == 0 {
159+
return
160+
}
161+
162+
w.addBufferLock.Lock()
163+
w.addBuffer = append(w.addBuffer, bufferItem[T]{
164+
opts: o,
165+
items: items,
166+
})
167+
w.addBufferLock.Unlock()
168+
169+
w.notifyItemAddedToAddBuffer()
170+
}
171+
172+
func (w *priorityqueue[T]) handleAddBuffer() {
173+
for {
174+
select {
175+
case <-w.done:
176+
return
177+
case <-w.itemAddedToAddBuffer:
178+
}
179+
180+
w.lock.Lock()
181+
w.lockedFlushAddBuffer()
182+
w.lock.Unlock()
183+
}
184+
}
185+
186+
func (w *priorityqueue[T]) lockedFlushAddBuffer() {
187+
w.addBufferLock.Lock()
188+
buffer := w.addBuffer
189+
w.addBuffer = make([]bufferItem[T], 0, len(buffer))
190+
w.addBufferLock.Unlock()
191+
192+
for _, v := range buffer {
193+
w.lockedAddWithOpts(v.opts, v.items...)
194+
}
195+
}
196+
197+
func (w *priorityqueue[T]) lockedAddWithOpts(o AddOpts, items ...T) {
198+
if w.shutdown.Load() {
199+
return
200+
}
201+
146202
var readyItemAdded bool
147203
var waitingItemAddedOrUpdated bool
148204

149-
w.lock.Lock()
150-
defer w.lock.Unlock()
151-
152205
for _, key := range items {
153206
after := o.After
154207
if o.RateLimited {
@@ -232,6 +285,13 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
232285
}
233286
}
234287

288+
func (w *priorityqueue[T]) notifyItemAddedToAddBuffer() {
289+
select {
290+
case w.itemAddedToAddBuffer <- struct{}{}:
291+
default:
292+
}
293+
}
294+
235295
func (w *priorityqueue[T]) notifyReadyItemOrWaiterAdded() {
236296
select {
237297
case w.readyItemOrWaiterAdded <- struct{}{}:
@@ -309,6 +369,12 @@ func (w *priorityqueue[T]) handleReadyItems() {
309369
w.lock.Lock()
310370
defer w.lock.Unlock()
311371

372+
// Flush is performed before reading items to avoid errors caused by asynchronous behavior,
373+
// primarily for unit testing purposes.
374+
// Successfully adding a ready item may result in an additional call to handleReadyItems(),
375+
// but the cost is negligible.
376+
w.lockedFlushAddBuffer()
377+
312378
if w.waiters == 0 {
313379
return
314380
}
@@ -424,6 +490,10 @@ func (w *priorityqueue[T]) Len() int {
424490
w.lock.Lock()
425491
defer w.lock.Unlock()
426492

493+
// Flush is performed before reading items to avoid errors caused by asynchronous behavior,
494+
// primarily for unit testing purposes.
495+
w.lockedFlushAddBuffer()
496+
427497
return w.ready.Len()
428498
}
429499

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,20 @@ func BenchmarkAddOnly(b *testing.B) {
371371
func BenchmarkAddLockContended(b *testing.B) {
372372
q := New[int]("")
373373
defer q.ShutDown()
374-
go func() {
375-
for range 1000 {
376-
item, _ := q.Get()
377-
q.Done(item)
378-
}
379-
}()
374+
375+
for i := range 1000000 {
376+
q.Add(i)
377+
}
378+
379+
for range 1000 {
380+
go func() {
381+
for {
382+
item, _ := q.Get()
383+
time.Sleep(1 * time.Millisecond)
384+
q.Done(item)
385+
}
386+
}()
387+
}
380388

381389
for b.Loop() {
382390
for i := range 1000 {

0 commit comments

Comments
 (0)