Skip to content

Commit f70e6da

Browse files
authored
feat(tracing): add store tracing (#3001)
* feat: add sequencer tracing instrumentation Add OpenTelemetry tracing for the core Sequencer interface. This traces all three main operations: - SubmitBatchTxs: tracks tx count and batch size - GetNextBatch: tracks tx count, forced inclusion count, batch size - VerifyBatch: tracks batch data count and verification result The tracing wrapper can be used with any Sequencer implementation (single, based, etc.) via WithTracingSequencer(). * chore: using helper fn instead of having it duplicated * chore: adding da retreiver syncing * chore: bump sonic version to work with 1.25 * chore: adding tracing for da submitter * chore: adding forced inclusion tracing * chore: handle tracing internally * chore: removed duplicate tracer * chore: simplified naming * chore: add store tracing * chore: propagating context * chore: reverted ctx change
1 parent 2101f3e commit f70e6da

File tree

3 files changed

+704
-0
lines changed

3 files changed

+704
-0
lines changed

node/full.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ func newFullNode(
8181

8282
mainKV := store.NewEvNodeKVStore(database)
8383
evstore := store.New(mainKV)
84+
if nodeConfig.Instrumentation.IsTracingEnabled() {
85+
evstore = store.WithTracingStore(evstore)
86+
}
8487

8588
headerSyncService, err := initHeaderSyncService(mainKV, evstore, nodeConfig, genesis, p2pClient, logger)
8689
if err != nil {

pkg/store/tracing.go

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
package store
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
7+
ds "github.com/ipfs/go-datastore"
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/codes"
11+
"go.opentelemetry.io/otel/trace"
12+
13+
"github.com/evstack/ev-node/types"
14+
)
15+
16+
var _ Store = (*tracedStore)(nil)
17+
18+
type tracedStore struct {
19+
inner Store
20+
tracer trace.Tracer
21+
}
22+
23+
// WithTracingStore wraps a Store with OpenTelemetry tracing.
24+
func WithTracingStore(inner Store) Store {
25+
return &tracedStore{
26+
inner: inner,
27+
tracer: otel.Tracer("ev-node/store"),
28+
}
29+
}
30+
31+
func (t *tracedStore) Height(ctx context.Context) (uint64, error) {
32+
ctx, span := t.tracer.Start(ctx, "Store.Height")
33+
defer span.End()
34+
35+
height, err := t.inner.Height(ctx)
36+
if err != nil {
37+
span.RecordError(err)
38+
span.SetStatus(codes.Error, err.Error())
39+
return height, err
40+
}
41+
42+
span.SetAttributes(attribute.Int64("height", int64(height)))
43+
return height, nil
44+
}
45+
46+
func (t *tracedStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) {
47+
ctx, span := t.tracer.Start(ctx, "Store.GetBlockData",
48+
trace.WithAttributes(attribute.Int64("height", int64(height))),
49+
)
50+
defer span.End()
51+
52+
header, data, err := t.inner.GetBlockData(ctx, height)
53+
if err != nil {
54+
span.RecordError(err)
55+
span.SetStatus(codes.Error, err.Error())
56+
return header, data, err
57+
}
58+
59+
return header, data, nil
60+
}
61+
62+
func (t *tracedStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) {
63+
ctx, span := t.tracer.Start(ctx, "Store.GetBlockByHash",
64+
trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))),
65+
)
66+
defer span.End()
67+
68+
header, data, err := t.inner.GetBlockByHash(ctx, hash)
69+
if err != nil {
70+
span.RecordError(err)
71+
span.SetStatus(codes.Error, err.Error())
72+
return header, data, err
73+
}
74+
75+
if header != nil {
76+
span.SetAttributes(attribute.Int64("height", int64(header.Height())))
77+
}
78+
return header, data, nil
79+
}
80+
81+
func (t *tracedStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) {
82+
ctx, span := t.tracer.Start(ctx, "Store.GetSignature",
83+
trace.WithAttributes(attribute.Int64("height", int64(height))),
84+
)
85+
defer span.End()
86+
87+
sig, err := t.inner.GetSignature(ctx, height)
88+
if err != nil {
89+
span.RecordError(err)
90+
span.SetStatus(codes.Error, err.Error())
91+
return sig, err
92+
}
93+
94+
return sig, nil
95+
}
96+
97+
func (t *tracedStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) {
98+
ctx, span := t.tracer.Start(ctx, "Store.GetSignatureByHash",
99+
trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))),
100+
)
101+
defer span.End()
102+
103+
sig, err := t.inner.GetSignatureByHash(ctx, hash)
104+
if err != nil {
105+
span.RecordError(err)
106+
span.SetStatus(codes.Error, err.Error())
107+
return sig, err
108+
}
109+
110+
return sig, nil
111+
}
112+
113+
func (t *tracedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) {
114+
ctx, span := t.tracer.Start(ctx, "Store.GetHeader",
115+
trace.WithAttributes(attribute.Int64("height", int64(height))),
116+
)
117+
defer span.End()
118+
119+
header, err := t.inner.GetHeader(ctx, height)
120+
if err != nil {
121+
span.RecordError(err)
122+
span.SetStatus(codes.Error, err.Error())
123+
return header, err
124+
}
125+
126+
return header, nil
127+
}
128+
129+
func (t *tracedStore) GetState(ctx context.Context) (types.State, error) {
130+
ctx, span := t.tracer.Start(ctx, "Store.GetState")
131+
defer span.End()
132+
133+
state, err := t.inner.GetState(ctx)
134+
if err != nil {
135+
span.RecordError(err)
136+
span.SetStatus(codes.Error, err.Error())
137+
return state, err
138+
}
139+
140+
span.SetAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight)))
141+
return state, nil
142+
}
143+
144+
func (t *tracedStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) {
145+
ctx, span := t.tracer.Start(ctx, "Store.GetStateAtHeight",
146+
trace.WithAttributes(attribute.Int64("height", int64(height))),
147+
)
148+
defer span.End()
149+
150+
state, err := t.inner.GetStateAtHeight(ctx, height)
151+
if err != nil {
152+
span.RecordError(err)
153+
span.SetStatus(codes.Error, err.Error())
154+
return state, err
155+
}
156+
157+
return state, nil
158+
}
159+
160+
func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, error) {
161+
ctx, span := t.tracer.Start(ctx, "Store.GetMetadata",
162+
trace.WithAttributes(attribute.String("key", key)),
163+
)
164+
defer span.End()
165+
166+
data, err := t.inner.GetMetadata(ctx, key)
167+
if err != nil {
168+
span.RecordError(err)
169+
span.SetStatus(codes.Error, err.Error())
170+
return data, err
171+
}
172+
173+
span.SetAttributes(attribute.Int("value.size", len(data)))
174+
return data, nil
175+
}
176+
177+
func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
178+
ctx, span := t.tracer.Start(ctx, "Store.SetMetadata",
179+
trace.WithAttributes(
180+
attribute.String("key", key),
181+
attribute.Int("value.size", len(value)),
182+
),
183+
)
184+
defer span.End()
185+
186+
err := t.inner.SetMetadata(ctx, key, value)
187+
if err != nil {
188+
span.RecordError(err)
189+
span.SetStatus(codes.Error, err.Error())
190+
return err
191+
}
192+
193+
return nil
194+
}
195+
196+
func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error {
197+
ctx, span := t.tracer.Start(ctx, "Store.Rollback",
198+
trace.WithAttributes(
199+
attribute.Int64("height", int64(height)),
200+
attribute.Bool("aggregator", aggregator),
201+
),
202+
)
203+
defer span.End()
204+
205+
err := t.inner.Rollback(ctx, height, aggregator)
206+
if err != nil {
207+
span.RecordError(err)
208+
span.SetStatus(codes.Error, err.Error())
209+
return err
210+
}
211+
212+
return nil
213+
}
214+
215+
func (t *tracedStore) Close() error {
216+
return t.inner.Close()
217+
}
218+
219+
func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) {
220+
ctx, span := t.tracer.Start(ctx, "Store.NewBatch")
221+
defer span.End()
222+
223+
batch, err := t.inner.NewBatch(ctx)
224+
if err != nil {
225+
span.RecordError(err)
226+
span.SetStatus(codes.Error, err.Error())
227+
return nil, err
228+
}
229+
230+
return &tracedBatch{
231+
inner: batch,
232+
tracer: t.tracer,
233+
ctx: ctx,
234+
}, nil
235+
}
236+
237+
var _ Batch = (*tracedBatch)(nil)
238+
239+
type tracedBatch struct {
240+
inner Batch
241+
tracer trace.Tracer
242+
ctx context.Context
243+
}
244+
245+
func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
246+
_, span := b.tracer.Start(b.ctx, "Batch.SaveBlockData",
247+
trace.WithAttributes(attribute.Int64("height", int64(header.Height()))),
248+
)
249+
defer span.End()
250+
251+
err := b.inner.SaveBlockData(header, data, signature)
252+
if err != nil {
253+
span.RecordError(err)
254+
span.SetStatus(codes.Error, err.Error())
255+
return err
256+
}
257+
258+
return nil
259+
}
260+
261+
func (b *tracedBatch) SetHeight(height uint64) error {
262+
_, span := b.tracer.Start(b.ctx, "Batch.SetHeight",
263+
trace.WithAttributes(attribute.Int64("height", int64(height))),
264+
)
265+
defer span.End()
266+
267+
err := b.inner.SetHeight(height)
268+
if err != nil {
269+
span.RecordError(err)
270+
span.SetStatus(codes.Error, err.Error())
271+
return err
272+
}
273+
274+
return nil
275+
}
276+
277+
func (b *tracedBatch) UpdateState(state types.State) error {
278+
_, span := b.tracer.Start(b.ctx, "Batch.UpdateState",
279+
trace.WithAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))),
280+
)
281+
defer span.End()
282+
283+
err := b.inner.UpdateState(state)
284+
if err != nil {
285+
span.RecordError(err)
286+
span.SetStatus(codes.Error, err.Error())
287+
return err
288+
}
289+
290+
return nil
291+
}
292+
293+
func (b *tracedBatch) Commit() error {
294+
_, span := b.tracer.Start(b.ctx, "Batch.Commit")
295+
defer span.End()
296+
297+
err := b.inner.Commit()
298+
if err != nil {
299+
span.RecordError(err)
300+
span.SetStatus(codes.Error, err.Error())
301+
return err
302+
}
303+
304+
return nil
305+
}
306+
307+
func (b *tracedBatch) Put(key ds.Key, value []byte) error {
308+
_, span := b.tracer.Start(b.ctx, "Batch.Put",
309+
trace.WithAttributes(
310+
attribute.String("key", key.String()),
311+
attribute.Int("value.size", len(value)),
312+
),
313+
)
314+
defer span.End()
315+
316+
err := b.inner.Put(key, value)
317+
if err != nil {
318+
span.RecordError(err)
319+
span.SetStatus(codes.Error, err.Error())
320+
return err
321+
}
322+
323+
return nil
324+
}
325+
326+
func (b *tracedBatch) Delete(key ds.Key) error {
327+
_, span := b.tracer.Start(b.ctx, "Batch.Delete",
328+
trace.WithAttributes(attribute.String("key", key.String())),
329+
)
330+
defer span.End()
331+
332+
err := b.inner.Delete(key)
333+
if err != nil {
334+
span.RecordError(err)
335+
span.SetStatus(codes.Error, err.Error())
336+
return err
337+
}
338+
339+
return nil
340+
}

0 commit comments

Comments
 (0)