Skip to content

Commit 6747e0f

Browse files
authored
[codex] Add latency phase metrics (#634)
* Add latency phase metrics * Rename upstream latency metric * Fix upstream timing boundary * Align request metrics with latency counts * Clarify unknown request metrics
1 parent 021b3ab commit 6747e0f

13 files changed

Lines changed: 346 additions & 128 deletions

app/integration.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,16 @@ func prepareIntegration(i *Integration) error {
234234
}
235235

236236
i.proxy.ModifyResponse = func(resp *http.Response) error {
237+
start := time.Now()
238+
if t, ok := metrics.UpstreamRoundtripStart(resp.Request.Context()); ok {
239+
metrics.RecordUpstreamRoundtripDuration(i.Name, time.Since(t))
240+
}
237241
caller := metrics.Caller(resp.Request.Context())
238242
metrics.OnResponse(i.Name, caller, resp.Request, resp)
239243
if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 {
240244
resp.Header.Set("X-AT-Upstream-Error", "true")
241245
}
246+
metrics.RecordResponseProcessingDuration(i.Name, time.Since(start))
242247
return nil
243248
}
244249

app/main.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,16 @@ const (
12961296

12971297
// proxyHandler handles incoming requests and proxies them according to the integration.
12981298
func proxyHandler(w http.ResponseWriter, r *http.Request) {
1299+
start := time.Now()
1300+
integrationName := "unknown"
1301+
preProxyRecorded := false
1302+
defer func() {
1303+
if !preProxyRecorded {
1304+
metrics.RecordPreProxyDuration(integrationName, time.Since(start))
1305+
}
1306+
metrics.RecordEndToEndDuration(integrationName, time.Since(start))
1307+
}()
1308+
12991309
host := r.Host
13001310
if !*disableXATInt {
13011311
hdr := r.Header.Get("X-AT-Int")
@@ -1315,6 +1325,8 @@ func proxyHandler(w http.ResponseWriter, r *http.Request) {
13151325
http.Error(w, fmt.Sprintf("integration for host %s not found", host), http.StatusNotFound)
13161326
return
13171327
}
1328+
integrationName = integ.Name
1329+
metrics.IncRequest(integ.Name)
13181330

13191331
clientIP, _, err := net.SplitHostPort(r.RemoteAddr)
13201332
if err != nil {
@@ -1463,6 +1475,10 @@ func proxyHandler(w http.ResponseWriter, r *http.Request) {
14631475
}
14641476

14651477
metrics.OnRequest(integ.Name, r)
1478+
handoffStart := time.Now()
1479+
r = r.WithContext(metrics.WithUpstreamRoundtripStart(r.Context(), handoffStart))
1480+
metrics.RecordPreProxyDuration(integ.Name, handoffStart.Sub(start))
1481+
preProxyRecorded = true
14661482
rec := &statusRecorder{ResponseWriter: w}
14671483
integ.proxy.ServeHTTP(rec, r)
14681484
if rec.status == 0 {

app/metrics/builtin.go

Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ import (
2020
const metricKeySeparator = "|"
2121

2222
var (
23-
requestCounts = expvar.NewMap("authtranslator_requests_total")
24-
rateLimitCounts = expvar.NewMap("authtranslator_rate_limit_events_total")
25-
authFailureCounts = expvar.NewMap("authtranslator_auth_failures_total")
26-
internalResponseCounts = expvar.NewMap("authtranslator_internal_responses_total")
27-
upstreamStatusCounts = expvar.NewMap("authtranslator_upstream_responses_total")
28-
requestDurations = expvar.NewMap("authtranslator_request_duration_seconds")
29-
LastReloadTime = expvar.NewString("authtranslator_last_reload")
30-
durationHistsMu sync.Mutex
31-
durationHists = make(map[string]*histogram)
32-
durationBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10}
23+
requestCounts = expvar.NewMap("authtranslator_requests_total")
24+
rateLimitCounts = expvar.NewMap("authtranslator_rate_limit_events_total")
25+
authFailureCounts = expvar.NewMap("authtranslator_auth_failures_total")
26+
internalResponseCounts = expvar.NewMap("authtranslator_internal_responses_total")
27+
upstreamStatusCounts = expvar.NewMap("authtranslator_upstream_responses_total")
28+
upstreamRoundtripDurations = newDurationMetric("authtranslator_upstream_roundtrip_duration_seconds")
29+
endToEndDurations = newDurationMetric("authtranslator_end_to_end_duration_seconds")
30+
preProxyDurations = newDurationMetric("authtranslator_pre_proxy_duration_seconds")
31+
responseProcessingDurations = newDurationMetric("authtranslator_response_processing_duration_seconds")
32+
LastReloadTime = expvar.NewString("authtranslator_last_reload")
33+
durationBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10}
3334
)
3435

3536
type histogram struct {
@@ -39,13 +40,29 @@ type histogram struct {
3940
sum float64
4041
}
4142

43+
type durationMetric struct {
44+
name string
45+
exp *expvar.Map
46+
47+
mu sync.Mutex
48+
hists map[string]*histogram
49+
}
50+
4251
func newHistogram() *histogram {
4352
return &histogram{
4453
buckets: durationBuckets,
4554
counts: make([]uint64, len(durationBuckets)+1),
4655
}
4756
}
4857

58+
func newDurationMetric(name string) *durationMetric {
59+
return &durationMetric{
60+
name: name,
61+
exp: expvar.NewMap(name),
62+
hists: make(map[string]*histogram),
63+
}
64+
}
65+
4966
func init() {
5067
LastReloadTime.Set(time.Now().Format(time.RFC3339))
5168
}
@@ -89,7 +106,7 @@ func (h *histogram) String() string {
89106
return string(buf)
90107
}
91108

92-
func (h *histogram) writeProm(w http.ResponseWriter, integ string) {
109+
func (h *histogram) writeProm(w http.ResponseWriter, name, integ string) {
93110
h.mu.Lock()
94111
buckets := append([]float64(nil), h.buckets...)
95112
counts := append([]uint64(nil), h.counts...)
@@ -98,12 +115,51 @@ func (h *histogram) writeProm(w http.ResponseWriter, integ string) {
98115
var cum uint64
99116
for i, b := range buckets {
100117
cum += counts[i]
101-
fmt.Fprintf(w, "authtranslator_request_duration_seconds_bucket{integration=%q,le=%q} %d\n", integ, strconv.FormatFloat(b, 'f', -1, 64), cum)
118+
fmt.Fprintf(w, "%s_bucket{integration=%q,le=%q} %d\n", name, integ, strconv.FormatFloat(b, 'f', -1, 64), cum)
102119
}
103120
cum += counts[len(buckets)]
104-
fmt.Fprintf(w, "authtranslator_request_duration_seconds_bucket{integration=%q,le=\"+Inf\"} %d\n", integ, cum)
105-
fmt.Fprintf(w, "authtranslator_request_duration_seconds_sum{integration=%q} %f\n", integ, sum)
106-
fmt.Fprintf(w, "authtranslator_request_duration_seconds_count{integration=%q} %d\n", integ, cum)
121+
fmt.Fprintf(w, "%s_bucket{integration=%q,le=\"+Inf\"} %d\n", name, integ, cum)
122+
fmt.Fprintf(w, "%s_sum{integration=%q} %f\n", name, integ, sum)
123+
fmt.Fprintf(w, "%s_count{integration=%q} %d\n", name, integ, cum)
124+
}
125+
126+
func (m *durationMetric) Record(integration string, d time.Duration) {
127+
m.mu.Lock()
128+
h, ok := m.hists[integration]
129+
if !ok {
130+
h = newHistogram()
131+
m.hists[integration] = h
132+
m.exp.Set(integration, h)
133+
}
134+
m.mu.Unlock()
135+
h.Observe(d.Seconds())
136+
}
137+
138+
func (m *durationMetric) Reset() {
139+
m.exp.Init()
140+
m.mu.Lock()
141+
m.hists = make(map[string]*histogram)
142+
m.mu.Unlock()
143+
}
144+
145+
func (m *durationMetric) WriteProm(w http.ResponseWriter) {
146+
writePromType(w, m.name, "histogram")
147+
148+
type histSnapshot struct {
149+
name string
150+
h *histogram
151+
}
152+
153+
m.mu.Lock()
154+
hists := make([]histSnapshot, 0, len(m.hists))
155+
for name, h := range m.hists {
156+
hists = append(hists, histSnapshot{name: name, h: h})
157+
}
158+
m.mu.Unlock()
159+
160+
for _, hs := range hists {
161+
hs.h.writeProm(w, m.name, hs.name)
162+
}
107163
}
108164

109165
// IncRequest increments the request counter for the integration.
@@ -128,17 +184,29 @@ func RecordStatus(integration string, status int) {
128184
upstreamStatusCounts.Add(key, 1)
129185
}
130186

131-
// RecordDuration records the upstream request duration.
132-
func RecordDuration(integration string, d time.Duration) {
133-
durationHistsMu.Lock()
134-
h, ok := durationHists[integration]
135-
if !ok {
136-
h = newHistogram()
137-
durationHists[integration] = h
138-
requestDurations.Set(integration, h)
139-
}
140-
durationHistsMu.Unlock()
141-
h.Observe(d.Seconds())
187+
// RecordUpstreamRoundtripDuration records the duration from proxy handoff until
188+
// AuthTranslator receives the upstream response.
189+
func RecordUpstreamRoundtripDuration(integration string, d time.Duration) {
190+
upstreamRoundtripDurations.Record(integration, d)
191+
}
192+
193+
// RecordEndToEndDuration records full request latency as observed by the
194+
// AuthTranslator handler.
195+
func RecordEndToEndDuration(integration string, d time.Duration) {
196+
endToEndDurations.Record(integration, d)
197+
}
198+
199+
// RecordPreProxyDuration records request-side processing time before
200+
// AuthTranslator proxies upstream or returns a local response.
201+
func RecordPreProxyDuration(integration string, d time.Duration) {
202+
preProxyDurations.Record(integration, d)
203+
}
204+
205+
// RecordResponseProcessingDuration records response-side processing time inside
206+
// AuthTranslator after an upstream response is received and before streaming the
207+
// body to the client begins.
208+
func RecordResponseProcessingDuration(integration string, d time.Duration) {
209+
responseProcessingDurations.Record(integration, d)
142210
}
143211

144212
func writePromType(w http.ResponseWriter, name, metricType string) {
@@ -152,20 +220,10 @@ func WriteProm(w http.ResponseWriter) {
152220
requestCounts.Do(func(kv expvar.KeyValue) {
153221
fmt.Fprintf(w, "authtranslator_requests_total{integration=%q} %s\n", kv.Key, kv.Value.String())
154222
})
155-
writePromType(w, "authtranslator_request_duration_seconds", "histogram")
156-
durationHistsMu.Lock()
157-
type histSnapshot struct {
158-
name string
159-
h *histogram
160-
}
161-
hists := make([]histSnapshot, 0, len(durationHists))
162-
for name, h := range durationHists {
163-
hists = append(hists, histSnapshot{name: name, h: h})
164-
}
165-
durationHistsMu.Unlock()
166-
for _, hs := range hists {
167-
hs.h.writeProm(w, hs.name)
168-
}
223+
upstreamRoundtripDurations.WriteProm(w)
224+
endToEndDurations.WriteProm(w)
225+
preProxyDurations.WriteProm(w)
226+
responseProcessingDurations.WriteProm(w)
169227
writePromType(w, "authtranslator_rate_limit_events_total", "counter")
170228
rateLimitCounts.Do(func(kv expvar.KeyValue) {
171229
fmt.Fprintf(w, "authtranslator_rate_limit_events_total{integration=%q} %s\n", kv.Key, kv.Value.String())

app/metrics/metrics_histogram_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestHistogramWriteProm(t *testing.T) {
5050
h.Observe(1)
5151

5252
rr := httptest.NewRecorder()
53-
h.writeProm(rr, "foo")
53+
h.writeProm(rr, "authtranslator_upstream_roundtrip_duration_seconds", "foo")
5454

5555
lines := strings.Split(strings.TrimSpace(rr.Body.String()), "\n")
5656
if len(lines) != 10 {
@@ -59,7 +59,7 @@ func TestHistogramWriteProm(t *testing.T) {
5959
if !strings.Contains(lines[0], "le=\"0.1\"") || !strings.HasSuffix(lines[0], " 0") {
6060
t.Fatalf("unexpected first line: %s", lines[0])
6161
}
62-
if !strings.HasPrefix(lines[8], "authtranslator_request_duration_seconds_sum{integration=\"foo\"}") {
62+
if !strings.HasPrefix(lines[8], "authtranslator_upstream_roundtrip_duration_seconds_sum{integration=\"foo\"}") {
6363
t.Fatalf("missing sum line")
6464
}
6565
if !strings.HasSuffix(lines[9], " 2") {

app/metrics/metrics_test.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ func TestMetricsHandlerEmpty(t *testing.T) {
4949
body := rr.Body.String()
5050
for _, line := range []string{
5151
"# TYPE authtranslator_requests_total counter",
52-
"# TYPE authtranslator_request_duration_seconds histogram",
52+
"# TYPE authtranslator_upstream_roundtrip_duration_seconds histogram",
53+
"# TYPE authtranslator_end_to_end_duration_seconds histogram",
54+
"# TYPE authtranslator_pre_proxy_duration_seconds histogram",
55+
"# TYPE authtranslator_response_processing_duration_seconds histogram",
5356
"# TYPE authtranslator_rate_limit_events_total counter",
5457
"# TYPE authtranslator_auth_failures_total counter",
5558
"# TYPE authtranslator_internal_responses_total counter",
@@ -74,9 +77,15 @@ func TestMetricsHandlerOutput(t *testing.T) {
7477
RecordStatus("foo", http.StatusOK)
7578
RecordStatus("bar", http.StatusTeapot)
7679
IncRequest("bar")
77-
RecordDuration("foo", 100*time.Millisecond)
78-
RecordDuration("foo", 200*time.Millisecond)
79-
RecordDuration("bar", 50*time.Millisecond)
80+
RecordUpstreamRoundtripDuration("foo", 100*time.Millisecond)
81+
RecordUpstreamRoundtripDuration("foo", 200*time.Millisecond)
82+
RecordUpstreamRoundtripDuration("bar", 50*time.Millisecond)
83+
RecordEndToEndDuration("foo", 250*time.Millisecond)
84+
RecordEndToEndDuration("bar", 75*time.Millisecond)
85+
RecordPreProxyDuration("foo", 20*time.Millisecond)
86+
RecordPreProxyDuration("bar", 10*time.Millisecond)
87+
RecordResponseProcessingDuration("foo", 15*time.Millisecond)
88+
RecordResponseProcessingDuration("bar", 5*time.Millisecond)
8089

8190
req := httptest.NewRequest(http.MethodGet, "/_at_internal/metrics", nil)
8291
rr := httptest.NewRecorder()
@@ -89,7 +98,10 @@ func TestMetricsHandlerOutput(t *testing.T) {
8998
body := rr.Body.String()
9099
for _, line := range []string{
91100
"# TYPE authtranslator_requests_total counter",
92-
"# TYPE authtranslator_request_duration_seconds histogram",
101+
"# TYPE authtranslator_upstream_roundtrip_duration_seconds histogram",
102+
"# TYPE authtranslator_end_to_end_duration_seconds histogram",
103+
"# TYPE authtranslator_pre_proxy_duration_seconds histogram",
104+
"# TYPE authtranslator_response_processing_duration_seconds histogram",
93105
"# TYPE authtranslator_rate_limit_events_total counter",
94106
"# TYPE authtranslator_auth_failures_total counter",
95107
"# TYPE authtranslator_internal_responses_total counter",
@@ -120,11 +132,20 @@ func TestMetricsHandlerOutput(t *testing.T) {
120132
if !strings.Contains(body, `authtranslator_upstream_responses_total{integration="bar",code="418"} 1`) {
121133
t.Fatal("missing bar status metric")
122134
}
123-
if !strings.Contains(body, `authtranslator_request_duration_seconds_sum{integration="foo"}`) {
124-
t.Fatal("missing foo duration histogram")
135+
if !strings.Contains(body, `authtranslator_upstream_roundtrip_duration_seconds_sum{integration="foo"}`) {
136+
t.Fatal("missing foo upstream duration histogram")
125137
}
126-
if !strings.Contains(body, `authtranslator_request_duration_seconds_sum{integration="bar"}`) {
127-
t.Fatal("missing bar duration histogram")
138+
if !strings.Contains(body, `authtranslator_upstream_roundtrip_duration_seconds_sum{integration="bar"}`) {
139+
t.Fatal("missing bar upstream duration histogram")
140+
}
141+
if !strings.Contains(body, `authtranslator_end_to_end_duration_seconds_sum{integration="foo"}`) {
142+
t.Fatal("missing foo end-to-end duration histogram")
143+
}
144+
if !strings.Contains(body, `authtranslator_pre_proxy_duration_seconds_sum{integration="foo"}`) {
145+
t.Fatal("missing foo pre-proxy duration histogram")
146+
}
147+
if !strings.Contains(body, `authtranslator_response_processing_duration_seconds_sum{integration="foo"}`) {
148+
t.Fatal("missing foo response processing duration histogram")
128149
}
129150
}
130151

@@ -135,7 +156,7 @@ func TestMetricsHandlerOutputWithPunctuation(t *testing.T) {
135156
underscoreName := "with_underscore"
136157

137158
IncRequest(dotName)
138-
RecordDuration(dotName, 150*time.Millisecond)
159+
RecordUpstreamRoundtripDuration(dotName, 150*time.Millisecond)
139160
RecordStatus(dotName, http.StatusAccepted)
140161
IncInternalResponse(dotName, http.StatusBadRequest, "invalid_destination")
141162

@@ -149,7 +170,7 @@ func TestMetricsHandlerOutputWithPunctuation(t *testing.T) {
149170
if !strings.Contains(body, fmt.Sprintf(`authtranslator_requests_total{integration=%q} 1`, dotName)) {
150171
t.Fatalf("missing request counter for %s: %s", dotName, body)
151172
}
152-
if !strings.Contains(body, fmt.Sprintf(`authtranslator_request_duration_seconds_sum{integration=%q}`, dotName)) {
173+
if !strings.Contains(body, fmt.Sprintf(`authtranslator_upstream_roundtrip_duration_seconds_sum{integration=%q}`, dotName)) {
153174
t.Fatalf("missing duration histogram for %s: %s", dotName, body)
154175
}
155176
if !strings.Contains(body, fmt.Sprintf(`authtranslator_upstream_responses_total{integration=%q,code=%q} 1`, dotName, "202")) {
@@ -211,6 +232,17 @@ func TestCallerContext(t *testing.T) {
211232
}
212233
}
213234

235+
func TestUpstreamRoundtripStartContext(t *testing.T) {
236+
start := time.Now()
237+
ctx := WithUpstreamRoundtripStart(context.Background(), start)
238+
if got, ok := UpstreamRoundtripStart(ctx); !ok || !got.Equal(start) {
239+
t.Fatalf("expected upstream roundtrip start %v, got %v ok=%v", start, got, ok)
240+
}
241+
if _, ok := UpstreamRoundtripStart(context.Background()); ok {
242+
t.Fatal("expected missing upstream roundtrip start on background context")
243+
}
244+
}
245+
214246
func TestWritePromPlugins(t *testing.T) {
215247
Reset()
216248
mu.Lock()
@@ -260,7 +292,7 @@ func TestWritePromSkipsMalformedUpstreamKeys(t *testing.T) {
260292

261293
func TestWritePromDoesNotBlockRecordDurationForOtherIntegrations(t *testing.T) {
262294
Reset()
263-
RecordDuration("one", 100*time.Millisecond)
295+
RecordUpstreamRoundtripDuration("one", 100*time.Millisecond)
264296

265297
blockCh := make(chan struct{})
266298
started := make(chan struct{})
@@ -284,14 +316,14 @@ func TestWritePromDoesNotBlockRecordDurationForOtherIntegrations(t *testing.T) {
284316

285317
recordDone := make(chan struct{})
286318
go func() {
287-
RecordDuration("two", 50*time.Millisecond)
319+
RecordUpstreamRoundtripDuration("two", 50*time.Millisecond)
288320
close(recordDone)
289321
}()
290322

291323
select {
292324
case <-recordDone:
293325
case <-time.After(300 * time.Millisecond):
294-
t.Fatal("RecordDuration blocked while metrics response writer was blocked")
326+
t.Fatal("RecordUpstreamRoundtripDuration blocked while metrics response writer was blocked")
295327
}
296328

297329
close(blockCh)

0 commit comments

Comments
 (0)