Skip to content

Commit 52fa0a8

Browse files
Shivaji KharseShivaji Kharse
authored andcommitted
resolve review comments
1 parent 69debb4 commit 52fa0a8

3 files changed

Lines changed: 76 additions & 129 deletions

File tree

dgraph/cmd/bulk/reduce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh cha
410410
if err := req.vectorBuf.SliceIterate(func(slice []byte) error {
411411
ve := unmarshalVectorEntry(slice)
412412
if ve == nil {
413-
// Skip malformed entries (already logged in unmarshalVectorEntry)
413+
vi.handleUnmarshalError()
414414
return nil
415415
}
416416
// Insert vector into HNSW and generate entries

dgraph/cmd/bulk/vector_indexer.go

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"math"
1313
"sync"
14+
"sync/atomic"
1415

1516
"github.com/dgraph-io/badger/v4"
1617
bpb "github.com/dgraph-io/badger/v4/pb"
@@ -19,6 +20,7 @@ import (
1920
"github.com/dgraph-io/dgraph/v25/tok"
2021
"github.com/dgraph-io/dgraph/v25/tok/hnsw"
2122
"github.com/dgraph-io/dgraph/v25/tok/index"
23+
"github.com/dgraph-io/dgraph/v25/x"
2224
"github.com/golang/glog"
2325
)
2426

@@ -55,6 +57,32 @@ type vectorEntry struct {
5557
vector []float32
5658
}
5759

60+
// handleError handles errors consistently with the bulk loader's error handling mechanism.
61+
// It increments the error count, logs to the error log if enabled, and terminates
62+
// the process if --ignore_errors is false.
63+
func (vi *vectorIndexer) handleError(err error, context string, filename string) {
64+
if vi.state == nil {
65+
glog.Errorf("vector indexer error (no state): %v [%s]", err, context)
66+
return
67+
}
68+
atomic.AddInt64(&vi.state.prog.errCount, 1)
69+
if vi.state.errorLog != nil {
70+
vi.state.errorLog.Log(filename, err, context)
71+
}
72+
glog.Errorf("vector indexer error: %v [%s]", err, context)
73+
if !vi.state.opt.IgnoreErrors {
74+
x.Check(err)
75+
}
76+
}
77+
78+
func (vi *vectorIndexer) handleUnmarshalError() {
79+
vi.handleError(
80+
fmt.Errorf("failed to unmarshal vector entry"),
81+
"malformed_data",
82+
"<vector_unmarshal>",
83+
)
84+
}
85+
5886
// vectorEntrySize calculates the size needed to marshal a vectorEntry.
5987
func vectorEntrySize(ve *vectorEntry) int {
6088
// pred length (4) + pred bytes + uid (8) + vector length (4) + vector data
@@ -304,7 +332,11 @@ func (vi *vectorIndexer) validateVectorDimension(pred string, vector []float32,
304332

305333
// Empty vectors are invalid
306334
if len(vector) == 0 {
307-
glog.Warningf("Empty vector for predicate %s uid %d, skipping", pred, uid)
335+
vi.handleError(
336+
fmt.Errorf("empty vector for predicate %s uid %d", pred, uid),
337+
fmt.Sprintf("predicate=%s uid=%d", pred, uid),
338+
"<vector_validation>",
339+
)
308340
return false
309341
}
310342

@@ -317,9 +349,11 @@ func (vi *vectorIndexer) validateVectorDimension(pred string, vector []float32,
317349
}
318350

319351
if len(vector) != expectedDim {
320-
321-
glog.Errorf("Dimension mismatch for predicate %s uid %d: expected %d, got %d",
322-
pred, uid, expectedDim, len(vector))
352+
vi.handleError(
353+
fmt.Errorf("dimension mismatch: expected %d, got %d", expectedDim, len(vector)),
354+
fmt.Sprintf("predicate=%s uid=%d expected_dim=%d actual_dim=%d", pred, uid, expectedDim, len(vector)),
355+
"<vector_validation>",
356+
)
323357
return false
324358
}
325359

@@ -331,16 +365,28 @@ func (vi *vectorIndexer) validateVectorDimension(pred string, vector []float32,
331365
// It validates the vector dimension, creates indexer lazily, and handles errors gracefully.
332366
func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
333367
if ve == nil {
334-
glog.Errorf("addVectorEntry: received nil vectorEntry, skipping")
368+
vi.handleError(
369+
fmt.Errorf("received nil vectorEntry"),
370+
"nil_entry",
371+
"<vector_indexer>",
372+
)
335373
return
336374
}
337375

338376
if ve.uid == 0 {
339-
glog.Errorf("addVectorEntry: INVALID UID=0 for pred=%s, skipping", ve.pred)
377+
vi.handleError(
378+
fmt.Errorf("invalid UID=0 for predicate %s", ve.pred),
379+
fmt.Sprintf("predicate=%s uid=0", ve.pred),
380+
"<vector_validation>",
381+
)
340382
return
341383
}
342384
if ve.uid == math.MaxUint64 {
343-
glog.Errorf("addVectorEntry: INVALID UID=MaxUint64 for pred=%s, skipping", ve.pred)
385+
vi.handleError(
386+
fmt.Errorf("invalid UID=MaxUint64 for predicate %s", ve.pred),
387+
fmt.Sprintf("predicate=%s uid=MaxUint64", ve.pred),
388+
"<vector_validation>",
389+
)
344390
return
345391
}
346392

@@ -351,7 +397,10 @@ func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
351397

352398
indexer, tc, err := vi.getOrCreateIndexer(ve.pred)
353399
if err != nil {
354-
glog.Errorf("Error getting indexer for %s: %v", ve.pred, err)
400+
vi.handleError(err,
401+
fmt.Sprintf("predicate=%s uid=%d", ve.pred, ve.uid),
402+
"<vector_indexer>",
403+
)
355404
return
356405
}
357406

@@ -363,10 +412,13 @@ func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
363412
if existingShard != vi.shardId {
364413
// This is a serious invariant violation - same predicate in multiple shards
365414
// This could lead to data corruption as HNSW graph would be split
366-
glog.Errorf("INVARIANT VIOLATION: predicate %s already assigned to shard %d, "+
367-
"but shard %d also received vectors for it. Data may be corrupted!",
368-
ve.pred, existingShard, vi.shardId)
369415
vi.predToShardMu.Unlock()
416+
vi.handleError(
417+
fmt.Errorf("predicate %s already assigned to shard %d, but shard %d also received vectors",
418+
ve.pred, existingShard, vi.shardId),
419+
fmt.Sprintf("predicate=%s existing_shard=%d current_shard=%d", ve.pred, existingShard, vi.shardId),
420+
"<vector_invariant>",
421+
)
370422
return // Skip this vector to prevent further corruption
371423
}
372424
} else {
@@ -379,7 +431,10 @@ func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
379431
// Insert into HNSW index
380432
_, err = indexer.Insert(context.Background(), tc, ve.uid, ve.vector)
381433
if err != nil {
382-
glog.Errorf("Error inserting vector for %s uid %d: %v", ve.pred, ve.uid, err)
434+
vi.handleError(err,
435+
fmt.Sprintf("predicate=%s uid=%d", ve.pred, ve.uid),
436+
"<vector_insert>",
437+
)
383438
return
384439
}
385440
}
@@ -389,7 +444,7 @@ func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
389444
func (vi *vectorIndexer) wait() {
390445
// Flush any remaining vector posting list writes
391446
if err := vi.flushWriteBatch(); err != nil {
392-
glog.Errorf("Error flushing write batch: %v", err)
447+
vi.handleError(err, fmt.Sprintf("shard=%d", vi.shardId), "<vector_flush>")
393448
}
394449

395450
vi.mu.Lock()
@@ -410,13 +465,16 @@ func (vi *vectorIndexer) wait() {
410465
// Update moves mutations from posting lists to delta cache
411466
txn.Update()
412467
if err := txn.CommitToDisk(writer, vi.state.writeTs); err != nil {
413-
glog.Errorf("Error committing HNSW data for predicate %s: %v", pred, err)
468+
vi.handleError(err,
469+
fmt.Sprintf("predicate=%s shard=%d", pred, vi.shardId),
470+
"<vector_commit>",
471+
)
414472
} else {
415473
glog.Infof("Committed HNSW data for predicate %s to tmpDb (shard %d)", pred, vi.shardId)
416474
}
417475
}
418476
if err := writer.Flush(); err != nil {
419-
glog.Errorf("Error flushing HNSW data to tmpDb: %v", err)
477+
vi.handleError(err, fmt.Sprintf("shard=%d", vi.shardId), "<vector_commit>")
420478
}
421479

422480
glog.Infof("Vector indexer wait completed for shard %d with %d predicates", vi.shardId, len(vi.vectorPreds))

0 commit comments

Comments
 (0)