Skip to content

Commit 2e7a9fa

Browse files
committed
Implement rate limiter based on atomic int64 operations
1 parent 979a121 commit 2e7a9fa

13 files changed

Lines changed: 239 additions & 33 deletions

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,10 @@
22
/vendor
33
cover.html
44
cover.out
5+
profile.out
6+
stat.csv
7+
stat.txt
8+
stat.html
59

610
*.swp
11+
.idea

Makefile

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,18 @@ GO_FILES := $(shell \
66
-o -name '*.go' -print | cut -b3-)
77

88
.PHONY: bench
9-
bench:
10-
go test -bench=. ./...
9+
bench: bin/benchstat bin/benchart
10+
go test -timeout 3h -count=5 -run=xxx -bench=BenchmarkRateLimiter ./... | tee stat.txt
11+
@$(GOBIN)/benchstat stat.txt
12+
@$(GOBIN)/benchstat -csv stat.txt > stat.csv
13+
@$(GOBIN)/benchart 'RateLimiter;xAxisType=log' stat.csv stat.html
14+
@open stat.html
15+
16+
bin/benchstat: tools/go.mod
17+
@cd tools && go install golang.org/x/perf/cmd/benchstat
18+
19+
bin/benchart: tools/go.mod
20+
@cd tools && go install github.com/storozhukBM/benchart
1121

1222
bin/golint: tools/go.mod
1323
@cd tools && go install golang.org/x/lint/golint

go.mod

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
module go.uber.org/ratelimit
22

3-
go 1.14
3+
go 1.18
44

55
require (
6-
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129
6+
github.com/benbjohnson/clock v1.3.0
77
github.com/stretchr/testify v1.6.1
88
go.uber.org/atomic v1.7.0
99
)
10+
11+
require (
12+
github.com/davecgh/go-spew v1.1.1 // indirect
13+
github.com/pmezard/go-difflib v1.0.0 // indirect
14+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
15+
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
2-
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
1+
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
2+
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
33
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
44
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
55
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

limiter_atomic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
6464
}
6565

6666
// Take blocks to ensure that the time spent between multiple
67-
// Take calls is on average time.Second/rate.
67+
// Take calls is on average per/rate.
6868
func (t *atomicLimiter) Take() time.Time {
6969
var (
7070
newState state

limiter_atomic_int64.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright (c) 2022 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package ratelimit // import "go.uber.org/ratelimit"
22+
23+
import (
24+
"time"
25+
26+
"sync/atomic"
27+
)
28+
29+
type atomicInt64Limiter struct {
30+
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
31+
// of this rate limiter in case of collocation with other frequently accessed memory.
32+
prepadding [64]byte // cache line size = 64; created to avoid false sharing.
33+
state int64 // unix nanoseconds of the next permissions issue.
34+
//lint:ignore U1000 like prepadding.
35+
postpadding [56]byte // cache line size - state size = 64 - 8; created to avoid false sharing.
36+
37+
perRequest time.Duration
38+
maxSlack time.Duration
39+
clock Clock
40+
}
41+
42+
// newAtomicBased returns a new atomic based limiter.
43+
func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
44+
// TODO consider moving config building to the implementation
45+
// independent code.
46+
config := buildConfig(opts)
47+
perRequest := config.per / time.Duration(rate)
48+
l := &atomicInt64Limiter{
49+
perRequest: perRequest,
50+
maxSlack: time.Duration(config.slack) * perRequest,
51+
clock: config.clock,
52+
}
53+
atomic.StoreInt64(&l.state, 0)
54+
return l
55+
}
56+
57+
// Take blocks to ensure that the time spent between multiple
58+
// Take calls is on average time.Second/rate.
59+
func (t *atomicInt64Limiter) Take() time.Time {
60+
var (
61+
newTimeOfNextPermissionIssue int64
62+
now int64
63+
)
64+
for {
65+
now = t.clock.Now().UnixNano()
66+
timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)
67+
68+
switch {
69+
case timeOfNextPermissionIssue == 0:
70+
// If this is our first request, then we allow it.
71+
newTimeOfNextPermissionIssue = now
72+
case now-timeOfNextPermissionIssue > int64(t.maxSlack):
73+
// a lot of nanoseconds passed since the last Take call
74+
// we will limit max accumulated time to maxSlack
75+
newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
76+
default:
77+
// calculate the time at which our permission was issued
78+
newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
79+
}
80+
81+
if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
82+
break
83+
}
84+
}
85+
nanosToSleepUntilOurPermissionIsIssued := newTimeOfNextPermissionIssue - now
86+
if nanosToSleepUntilOurPermissionIsIssued > 0 {
87+
t.clock.Sleep(time.Duration(nanosToSleepUntilOurPermissionIsIssued))
88+
}
89+
return time.Unix(0, newTimeOfNextPermissionIssue)
90+
}

limiter_mutexbased.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func newMutexBased(rate int, opts ...Option) *mutexLimiter {
4949
}
5050

5151
// Take blocks to ensure that the time spent between multiple
52-
// Take calls is on average time.Second/rate.
52+
// Take calls is on average per/rate.
5353
func (t *mutexLimiter) Take() time.Time {
5454
t.Lock()
5555
defer t.Unlock()

ratelimit.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ package ratelimit // import "go.uber.org/ratelimit"
2323
import (
2424
"time"
2525

26-
"github.com/andres-erbsen/clock"
26+
"github.com/benbjohnson/clock"
2727
)
2828

2929
// Note: This file is inspired by:
@@ -54,7 +54,7 @@ type config struct {
5454

5555
// New returns a Limiter that will limit to the given RPS.
5656
func New(rate int, opts ...Option) Limiter {
57-
return newAtomicBased(rate, opts...)
57+
return newAtomicInt64Based(rate, opts...)
5858
}
5959

6060
// buildConfig combines defaults with options.

ratelimit_bench_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ func BenchmarkRateLimiter(b *testing.B) {
1414
for _, procs := range []int{1, 4, 8, 16} {
1515
runtime.GOMAXPROCS(procs)
1616
for name, limiter := range map[string]Limiter{
17-
"atomic": New(b.N * 10000000),
18-
"mutex": newMutexBased(b.N * 10000000),
17+
"atomic": newAtomicBased(b.N * 1000000000000),
18+
"atomic_int64": New(b.N * 1000000000000),
19+
"mutex": newMutexBased(b.N * 1000000000000),
1920
} {
2021
for ng := 1; ng < 16; ng++ {
2122
runner(b, name, procs, ng, limiter, count)
@@ -47,7 +48,9 @@ func BenchmarkRateLimiter(b *testing.B) {
4748
}
4849

4950
func runner(b *testing.B, name string, procs int, ng int, limiter Limiter, count *atomic.Int64) bool {
50-
return b.Run(fmt.Sprintf("type:%s-procs:%d-goroutines:%d", name, procs, ng), func(b *testing.B) {
51+
return b.Run(fmt.Sprintf("type:%s;max_procs:%d;goroutines:%d", name, procs, ng), func(b *testing.B) {
52+
b.ReportAllocs()
53+
5154
var wg sync.WaitGroup
5255
trigger := atomic.NewBool(true)
5356
n := b.N

ratelimit_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"go.uber.org/atomic"
99

10-
"github.com/andres-erbsen/clock"
10+
"github.com/benbjohnson/clock"
1111
"github.com/stretchr/testify/assert"
1212
)
1313

@@ -54,13 +54,23 @@ func runTest(t *testing.T, fn func(testRunner)) {
5454
return newAtomicBased(rate, opts...)
5555
},
5656
},
57+
{
58+
name: "atomic_int64",
59+
constructor: func(rate int, opts ...Option) Limiter {
60+
return newAtomicInt64Based(rate, opts...)
61+
},
62+
},
5763
}
5864

5965
for _, tt := range impls {
6066
t.Run(tt.name, func(t *testing.T) {
67+
// Set a non-default time.Time since some limiters (int64 in particular) use
68+
// the default value as "non-initialized" state.
69+
clockMock := clock.NewMock()
70+
clockMock.Set(time.Now())
6171
r := runnerImpl{
6272
t: t,
63-
clock: clock.NewMock(),
73+
clock: clockMock,
6474
constructor: tt.constructor,
6575
doneCh: make(chan struct{}),
6676
}

0 commit comments

Comments
 (0)