Skip to content

Commit 0e6d083

Browse files
committed
Restore int64 based atomic rate limiter (uber-go#94)
This limiter was introduced and merged in the following PR uber-go#85 Later @twelsh-aw found an issue with this implementation uber-go#90 So @rabbbit reverted this change in uber-go#91 Our tests did not detect this issue, so we have a separate PR uber-go#93 that enhances our tests approach to detect potential errors better. With this PR, we want to restore the int64-based atomic rate limiter implementation as a non-default rate limiter and then check that uber-go#93 will detect the bug. Right after it, we'll open a subsequent PR to fix this bug.
1 parent 4d08695 commit 0e6d083

4 files changed

Lines changed: 103 additions & 3 deletions

File tree

limiter_atomic_int64.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright (c) 2022 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package ratelimit // import "go.uber.org/ratelimit"
22+
23+
import (
24+
"time"
25+
26+
"sync/atomic"
27+
)
28+
29+
type atomicInt64Limiter struct {
30+
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
31+
// of this rate limiter in case of collocation with other frequently accessed memory.
32+
prepadding [64]byte // cache line size = 64; created to avoid false sharing.
33+
state int64 // unix nanoseconds of the next permissions issue.
34+
//lint:ignore U1000 like prepadding.
35+
postpadding [56]byte // cache line size - state size = 64 - 8; created to avoid false sharing.
36+
37+
perRequest time.Duration
38+
maxSlack time.Duration
39+
clock Clock
40+
}
41+
42+
// newAtomicBased returns a new atomic based limiter.
43+
func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
44+
// TODO consider moving config building to the implementation
45+
// independent code.
46+
config := buildConfig(opts)
47+
perRequest := config.per / time.Duration(rate)
48+
l := &atomicInt64Limiter{
49+
perRequest: perRequest,
50+
maxSlack: time.Duration(config.slack) * perRequest,
51+
clock: config.clock,
52+
}
53+
atomic.StoreInt64(&l.state, 0)
54+
return l
55+
}
56+
57+
// Take blocks to ensure that the time spent between multiple
58+
// Take calls is on average time.Second/rate.
59+
func (t *atomicInt64Limiter) Take() time.Time {
60+
var (
61+
newTimeOfNextPermissionIssue int64
62+
now int64
63+
)
64+
for {
65+
now = t.clock.Now().UnixNano()
66+
timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)
67+
68+
switch {
69+
case timeOfNextPermissionIssue == 0:
70+
// If this is our first request, then we allow it.
71+
newTimeOfNextPermissionIssue = now
72+
case now-timeOfNextPermissionIssue > int64(t.maxSlack):
73+
// a lot of nanoseconds passed since the last Take call
74+
// we will limit max accumulated time to maxSlack
75+
newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
76+
default:
77+
// calculate the time at which our permission was issued
78+
newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
79+
}
80+
81+
if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
82+
break
83+
}
84+
}
85+
nanosToSleepUntilOurPermissionIsIssued := newTimeOfNextPermissionIssue - now
86+
if nanosToSleepUntilOurPermissionIsIssued > 0 {
87+
t.clock.Sleep(time.Duration(nanosToSleepUntilOurPermissionIsIssued))
88+
}
89+
return time.Unix(0, newTimeOfNextPermissionIssue)
90+
}

ratelimit_bench_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ func BenchmarkRateLimiter(b *testing.B) {
1414
for _, procs := range []int{1, 4, 8, 16} {
1515
runtime.GOMAXPROCS(procs)
1616
for name, limiter := range map[string]Limiter{
17-
"atomic": New(b.N * 10000000),
18-
"mutex": newMutexBased(b.N * 10000000),
17+
"atomic": newAtomicBased(b.N * 1000000000000),
18+
"atomic_int64": newAtomicInt64Based(b.N * 1000000000000),
19+
"mutex": newMutexBased(b.N * 1000000000000),
1920
} {
2021
for ng := 1; ng < 16; ng++ {
2122
runner(b, name, procs, ng, limiter, count)
@@ -47,7 +48,9 @@ func BenchmarkRateLimiter(b *testing.B) {
4748
}
4849

4950
func runner(b *testing.B, name string, procs int, ng int, limiter Limiter, count *atomic.Int64) bool {
50-
return b.Run(fmt.Sprintf("type:%s-procs:%d-goroutines:%d", name, procs, ng), func(b *testing.B) {
51+
return b.Run(fmt.Sprintf("type:%s;max_procs:%d;goroutines:%d", name, procs, ng), func(b *testing.B) {
52+
b.ReportAllocs()
53+
5154
var wg sync.WaitGroup
5255
trigger := atomic.NewBool(true)
5356
n := b.N

ratelimit_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ func runTest(t *testing.T, fn func(testRunner)) {
6262
return newAtomicBased(rate, opts...)
6363
},
6464
},
65+
{
66+
name: "atomic_int64",
67+
constructor: func(rate int, opts ...Option) Limiter {
68+
return newAtomicInt64Based(rate, opts...)
69+
},
70+
},
6571
}
6672

6773
for _, tt := range impls {

tools/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111

1212
require (
1313
github.com/BurntSushi/toml v1.0.0 // indirect
14+
github.com/storozhukBM/benchart v1.0.0
1415
golang.org/x/exp/typeparams v0.0.0-20220328175248-053ad81199eb // indirect
1516
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
1617
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f // indirect

0 commit comments

Comments
 (0)