Skip to content

Commit ee1f465

Browse files
anrsDuodenumL
andauthored
Iops (#101)
* support iops monitoring * fix: lint failed Co-authored-by: DuodenumL <qq2410088750@live.com>
1 parent 3f243dd commit ee1f465

7 files changed

Lines changed: 348 additions & 5 deletions

File tree

runtime/docker/blkio_metrics.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package docker
2+
3+
import (
4+
enginetypes "github.com/docker/docker/api/types"
5+
6+
"github.com/projecteru2/agent/utils"
7+
)
8+
9+
const (
10+
ReadOp = "Read"
11+
WriteOp = "Write"
12+
)
13+
14+
// per device level
15+
type BlkIOMetrics struct {
16+
IOServiceBytesReadRecursive []*BlkIOEntry
17+
IOServiceBytesWriteRecursive []*BlkIOEntry
18+
IOServicedReadRecusive []*BlkIOEntry
19+
IOServicedWriteRecusive []*BlkIOEntry
20+
}
21+
22+
type BlkIOEntry struct {
23+
Dev string
24+
Value uint64
25+
}
26+
27+
func fromEngineBlkioStats(raw *enginetypes.BlkioStats) (*BlkIOMetrics, error) {
28+
blkioMetrics := &BlkIOMetrics{}
29+
for _, entry := range raw.IoServiceBytesRecursive {
30+
devPath, err := utils.GetDevicePath(entry.Major, entry.Minor)
31+
if err != nil {
32+
return nil, err
33+
}
34+
switch entry.Op {
35+
case ReadOp:
36+
blkioMetrics.IOServiceBytesReadRecursive = append(blkioMetrics.IOServiceBytesReadRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
37+
case WriteOp:
38+
blkioMetrics.IOServiceBytesWriteRecursive = append(blkioMetrics.IOServiceBytesWriteRecursive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
39+
}
40+
}
41+
for _, entry := range raw.IoServicedRecursive {
42+
devPath, err := utils.GetDevicePath(entry.Major, entry.Minor)
43+
if err != nil {
44+
return nil, err
45+
}
46+
switch entry.Op {
47+
case ReadOp:
48+
blkioMetrics.IOServicedReadRecusive = append(blkioMetrics.IOServicedReadRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
49+
case WriteOp:
50+
blkioMetrics.IOServicedWriteRecusive = append(blkioMetrics.IOServicedWriteRecusive, &BlkIOEntry{Dev: devPath, Value: entry.Value})
51+
}
52+
}
53+
return blkioMetrics, nil
54+
}
55+
56+
// getBlkIOMetricsDifference calculate differences between old and new metrics (new-old), for missing metrics, will use default 0 as value
57+
func getBlkIOMetricsDifference(old *BlkIOMetrics, new *BlkIOMetrics) (diff *BlkIOMetrics) {
58+
return &BlkIOMetrics{
59+
IOServiceBytesReadRecursive: getGroupDifference(old.IOServiceBytesReadRecursive, new.IOServiceBytesReadRecursive),
60+
IOServiceBytesWriteRecursive: getGroupDifference(old.IOServiceBytesWriteRecursive, new.IOServiceBytesWriteRecursive),
61+
IOServicedReadRecusive: getGroupDifference(old.IOServicedReadRecusive, new.IOServicedReadRecusive),
62+
IOServicedWriteRecusive: getGroupDifference(old.IOServicedWriteRecusive, new.IOServicedWriteRecusive),
63+
}
64+
}
65+
66+
func getGroupDifference(old []*BlkIOEntry, new []*BlkIOEntry) (diff []*BlkIOEntry) {
67+
lookup := func(dev string, entryList []*BlkIOEntry) uint64 {
68+
for _, entry := range entryList {
69+
if entry.Dev == dev {
70+
return entry.Value
71+
}
72+
}
73+
return 0
74+
}
75+
for _, entry := range new {
76+
diffEntry := &BlkIOEntry{
77+
Dev: entry.Dev,
78+
Value: entry.Value - lookup(entry.Dev, old),
79+
}
80+
diff = append(diff, diffEntry)
81+
}
82+
return
83+
}

runtime/docker/docker.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,29 @@ func (d *Docker) LogFieldsExtra(ctx context.Context, ID string) (map[string]stri
331331
return extra, nil
332332
}
333333

334+
func (d *Docker) getContainerStats(ctx context.Context, ID string) (*enginetypes.StatsJSON, error) {
335+
rawStat, err := d.client.ContainerStatsOneShot(ctx, ID)
336+
if err != nil {
337+
log.Errorf("[getContainerStats] failed to get container %s stats, err: %v", ID, err)
338+
return nil, err
339+
}
340+
b, err := io.ReadAll(rawStat.Body)
341+
if err != nil {
342+
log.Errorf("[getContainerStats] failed to read container %s stats, err: %v", ID, err)
343+
return nil, err
344+
}
345+
stats := &enginetypes.StatsJSON{}
346+
return stats, json.Unmarshal(b, stats)
347+
}
348+
349+
func (d *Docker) getBlkioStats(ctx context.Context, ID string) (*enginetypes.BlkioStats, error) {
350+
fullStat, err := d.getContainerStats(ctx, ID)
351+
if err != nil {
352+
return nil, err
353+
}
354+
return &fullStat.BlkioStats, nil
355+
}
356+
334357
// IsDaemonRunning returns if the runtime daemon is running.
335358
func (d *Docker) IsDaemonRunning(ctx context.Context) bool {
336359
var err error

runtime/docker/metrics.go

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,17 @@ type MetricsClient struct {
4242
errOut *prometheus.GaugeVec
4343
dropIn *prometheus.GaugeVec
4444
dropOut *prometheus.GaugeVec
45+
46+
// diskio stats
47+
ioServiceBytesRead *prometheus.GaugeVec
48+
ioServiceBytesWrite *prometheus.GaugeVec
49+
ioServicedRead *prometheus.GaugeVec
50+
ioServicedWrite *prometheus.GaugeVec
51+
// io/byte per second
52+
ioServiceBytesReadPerSecond *prometheus.GaugeVec
53+
ioServiceBytesWritePerSecond *prometheus.GaugeVec
54+
ioServicedReadPerSecond *prometheus.GaugeVec
55+
ioServicedWritePerSecond *prometheus.GaugeVec
4556
}
4657

4758
var clients sync.Map
@@ -67,7 +78,6 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
6778
"orchestrator": cluster.ERUMark,
6879
"labels": strings.Join(clables, ","),
6980
}
70-
7181
cpuHostUsage := prometheus.NewGauge(prometheus.GaugeOpts{
7282
Name: "cpu_host_usage",
7383
Help: "cpu usage in host view.",
@@ -163,7 +173,46 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
163173
Help: "drop out.",
164174
ConstLabels: labels,
165175
}, []string{"nic"})
166-
176+
ioServiceBytesRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{
177+
Name: "io_service_bytes_read",
178+
Help: "number of bytes read to the disk by the group.",
179+
ConstLabels: labels,
180+
}, []string{"dev"})
181+
ioServiceBytesWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{
182+
Name: "io_service_bytes_write",
183+
Help: "number of bytes write to the disk by the group.",
184+
ConstLabels: labels,
185+
}, []string{"dev"})
186+
ioServicedRead := prometheus.NewGaugeVec(prometheus.GaugeOpts{
187+
Name: "io_serviced_read",
188+
Help: "number of read IOs to the disk by the group.",
189+
ConstLabels: labels,
190+
}, []string{"dev"})
191+
ioServicedWrite := prometheus.NewGaugeVec(prometheus.GaugeOpts{
192+
Name: "io_serviced_write",
193+
Help: "number of write IOs to the disk by the group.",
194+
ConstLabels: labels,
195+
}, []string{"dev"})
196+
ioServiceBytesReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
197+
Name: "io_service_bytes_read_per_second",
198+
Help: "number of bytes read per second to the disk by the group.",
199+
ConstLabels: labels,
200+
}, []string{"dev"})
201+
ioServiceBytesWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
202+
Name: "io_service_bytes_write_per_second",
203+
Help: "number of bytes write per second to the disk by the group.",
204+
ConstLabels: labels,
205+
}, []string{"dev"})
206+
ioServicedReadPerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
207+
Name: "io_serviced_read_per_second",
208+
Help: "number of read IOs per second to the disk by the group.",
209+
ConstLabels: labels,
210+
}, []string{"dev"})
211+
ioServicedWritePerSecond := prometheus.NewGaugeVec(prometheus.GaugeOpts{
212+
Name: "io_serviced_write_per_second",
213+
Help: "number of write IOs per second to the disk by the group.",
214+
ConstLabels: labels,
215+
}, []string{"dev"})
167216
// TODO 这里已经没有了版本了
168217
tag := fmt.Sprintf("%s.%s", hostname, coreutils.ShortID(container.ID))
169218
endpoint := fmt.Sprintf("%s.%s", container.Name, container.EntryPoint)
@@ -174,7 +223,7 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
174223
cpuContainerSysUsage, cpuContainerUsage, cpuContainerUserUsage,
175224
memMaxUsage, memRss, memUsage, memPercent, memRSSPercent,
176225
bytesRecv, bytesSent, packetsRecv, packetsSent,
177-
errIn, errOut, dropIn, dropOut,
226+
errIn, errOut, dropIn, dropOut, ioServiceBytesRead, ioServiceBytesWrite, ioServicedRead, ioServicedWrite, ioServiceBytesReadPerSecond, ioServiceBytesWritePerSecond, ioServicedReadPerSecond, ioServicedWritePerSecond,
178227
)
179228

180229
metricsClient := &MetricsClient{
@@ -204,6 +253,16 @@ func NewMetricsClient(statsd, hostname string, container *Container) *MetricsCli
204253
errOut: errOut,
205254
dropIn: dropIn,
206255
dropOut: dropOut,
256+
257+
ioServiceBytesRead: ioServiceBytesRead,
258+
ioServiceBytesWrite: ioServiceBytesWrite,
259+
ioServicedRead: ioServicedRead,
260+
ioServicedWrite: ioServicedWrite,
261+
262+
ioServiceBytesReadPerSecond: ioServiceBytesReadPerSecond,
263+
ioServiceBytesWritePerSecond: ioServiceBytesWritePerSecond,
264+
ioServicedReadPerSecond: ioServicedReadPerSecond,
265+
ioServicedWritePerSecond: ioServicedWritePerSecond,
207266
}
208267
clients.Store(container.ID, metricsClient)
209268
return metricsClient
@@ -233,6 +292,16 @@ func (m *MetricsClient) Unregister() {
233292
prometheus.Unregister(m.errOut)
234293
prometheus.Unregister(m.dropIn)
235294
prometheus.Unregister(m.dropOut)
295+
296+
prometheus.Unregister(m.ioServiceBytesRead)
297+
prometheus.Unregister(m.ioServiceBytesWrite)
298+
prometheus.Unregister(m.ioServicedRead)
299+
prometheus.Unregister(m.ioServicedWrite)
300+
301+
prometheus.Unregister(m.ioServiceBytesReadPerSecond)
302+
prometheus.Unregister(m.ioServiceBytesWritePerSecond)
303+
prometheus.Unregister(m.ioServicedReadPerSecond)
304+
prometheus.Unregister(m.ioServicedWritePerSecond)
236305
}
237306

238307
// CPUHostUsage set cpu usage in host view
@@ -349,6 +418,54 @@ func (m *MetricsClient) DropOut(nic string, i float64) {
349418
m.dropOut.WithLabelValues(nic).Set(i)
350419
}
351420

421+
// IOServiceBytesRead .
422+
func (m *MetricsClient) IOServiceBytesRead(dev string, i float64) {
423+
m.data[dev+".io_service_bytes_read"] = i
424+
m.ioServiceBytesRead.WithLabelValues(dev).Set(i)
425+
}
426+
427+
// IOServiceBytesWrite .
428+
func (m *MetricsClient) IOServiceBytesWrite(dev string, i float64) {
429+
m.data[dev+".io_service_bytes_write"] = i
430+
m.ioServiceBytesWrite.WithLabelValues(dev).Set(i)
431+
}
432+
433+
// IOServicedRead .
434+
func (m *MetricsClient) IOServicedRead(dev string, i float64) {
435+
m.data[dev+".io_serviced_read"] = i
436+
m.ioServicedRead.WithLabelValues(dev).Set(i)
437+
}
438+
439+
// IOServicedWrite .
440+
func (m *MetricsClient) IOServicedWrite(dev string, i float64) {
441+
m.data[dev+".io_serviced_write"] = i
442+
m.ioServicedWrite.WithLabelValues(dev).Set(i)
443+
}
444+
445+
// IOServiceBytesReadPerSecond .
446+
func (m *MetricsClient) IOServiceBytesReadPerSecond(dev string, i float64) {
447+
m.data[dev+".io_service_bytes_read_per_second"] = i
448+
m.ioServiceBytesReadPerSecond.WithLabelValues(dev).Set(i)
449+
}
450+
451+
// IOServiceBytesWritePerSecond .
452+
func (m *MetricsClient) IOServiceBytesWritePerSecond(dev string, i float64) {
453+
m.data[dev+".io_service_bytes_write_per_second"] = i
454+
m.ioServiceBytesWritePerSecond.WithLabelValues(dev).Set(i)
455+
}
456+
457+
// IOServicedReadPerSecond .
458+
func (m *MetricsClient) IOServicedReadPerSecond(dev string, i float64) {
459+
m.data[dev+".io_serviced_read_per_second"] = i
460+
m.ioServicedReadPerSecond.WithLabelValues(dev).Set(i)
461+
}
462+
463+
// IOServicedWritePerSecond .
464+
func (m *MetricsClient) IOServicedWritePerSecond(dev string, i float64) {
465+
m.data[dev+".io_serviced_write_per_second"] = i
466+
m.ioServicedWritePerSecond.WithLabelValues(dev).Set(i)
467+
}
468+
352469
// Lazy connecting
353470
func (m *MetricsClient) checkConn() error {
354471
if m.statsdClient != nil {

runtime/docker/stat.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
// CollectWorkloadMetrics .
15-
func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
15+
func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) { //nolint
1616
// TODO
1717
// FIXME fuck internal pkg
1818
proc := "/proc"
@@ -31,7 +31,16 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
3131
log.Errorf("[stat] get %s stats failed %v", container.ID, err)
3232
return
3333
}
34-
34+
rawBlkioStats, err := d.getBlkioStats(ctx, container.ID)
35+
if err != nil {
36+
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
37+
return
38+
}
39+
blkioStats, err := fromEngineBlkioStats(rawBlkioStats)
40+
if err != nil {
41+
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
42+
return
43+
}
3544
delta := float64(d.config.Metrics.Step)
3645
timeout := time.Duration(d.config.Metrics.Step) * time.Second
3746
tick := time.NewTicker(timeout)
@@ -128,6 +137,44 @@ func (d *Docker) CollectWorkloadMetrics(ctx context.Context, ID string) {
128137
mClient.DropIn(nic.Name, float64(nic.Dropin-oldNICStats.Dropin)/delta)
129138
mClient.DropOut(nic.Name, float64(nic.Dropout-oldNICStats.Dropout)/delta)
130139
}
140+
log.Debugf("[stat] start to get blkio stats for %s", container.ID)
141+
newRawBlkioStats, err := d.getBlkioStats(ctx, container.ID)
142+
if err != nil {
143+
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
144+
return
145+
}
146+
newBlkioStats, err := fromEngineBlkioStats(newRawBlkioStats)
147+
if err != nil {
148+
log.Errorf("[stat] get %s diskio stats failed %v", container.ID, err)
149+
return
150+
}
151+
for _, entry := range newBlkioStats.IOServiceBytesReadRecursive {
152+
mClient.IOServiceBytesRead(entry.Dev, float64(entry.Value))
153+
}
154+
for _, entry := range newBlkioStats.IOServiceBytesWriteRecursive {
155+
mClient.IOServiceBytesWrite(entry.Dev, float64(entry.Value))
156+
}
157+
for _, entry := range newBlkioStats.IOServicedReadRecusive {
158+
mClient.IOServicedRead(entry.Dev, float64(entry.Value))
159+
}
160+
for _, entry := range newBlkioStats.IOServicedWriteRecusive {
161+
mClient.IOServicedWrite(entry.Dev, float64(entry.Value))
162+
}
163+
// update diff
164+
diffBlkioStats := getBlkIOMetricsDifference(blkioStats, newBlkioStats)
165+
for _, entry := range diffBlkioStats.IOServiceBytesReadRecursive {
166+
mClient.IOServiceBytesReadPerSecond(entry.Dev, float64(entry.Value)/delta)
167+
}
168+
for _, entry := range diffBlkioStats.IOServiceBytesWriteRecursive {
169+
mClient.IOServiceBytesWritePerSecond(entry.Dev, float64(entry.Value)/delta)
170+
}
171+
for _, entry := range diffBlkioStats.IOServicedReadRecusive {
172+
mClient.IOServicedReadPerSecond(entry.Dev, float64(entry.Value)/delta)
173+
}
174+
for _, entry := range diffBlkioStats.IOServicedWriteRecusive {
175+
mClient.IOServicedWritePerSecond(entry.Dev, float64(entry.Value)/delta)
176+
}
177+
rawBlkioStats, blkioStats = newRawBlkioStats, newBlkioStats
131178
containerCPUStats, systemCPUStats, containerNetStats = newContainerCPUStats, newSystemCPUStats, newContainerNetStats
132179
if err := mClient.Send(); err != nil {
133180
log.Errorf("[stat] Send metrics failed %v", err)

0 commit comments

Comments
 (0)