Skip to content

Commit 13fd5f3

Browse files
committed
feat(data): coalesce position deletes into range inserts
Add ForEachPositionDelete (the C++ equivalent of Java's PositionDeleteRangeConsumer) and route DeleteLoader through it, replacing the per-position PositionDeleteIndex::Delete(pos) call. The function sniffs a 1024-position prefix and dispatches to either run coalescing (CRoaring addRange) or bulk addMany grouped by high-32-bit key. Also rework DeleteLoader::LoadPositionDelete to read Arrow batches via nanoarrow's ArrowArrayView directly. When the delete file's referenced_data_file matches the target (V2 writer hint), positions are passed as a zero-copy span; otherwise a per-batch staging vector filters by path. Local microbenchmarks: 2.2x-10.6x for ForEachPositionDelete and 2.1x-2.5x end-to-end through LoadPositionDeletes. Equivalent of apache/iceberg#16052.
1 parent ff42229 commit 13fd5f3

12 files changed

Lines changed: 480 additions & 18 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ set(ICEBERG_DATA_SOURCES
165165
data/position_delete_writer.cc
166166
data/writer.cc
167167
deletes/position_delete_index.cc
168+
deletes/position_delete_range_consumer.cc
168169
deletes/roaring_position_bitmap.cc
169170
puffin/file_metadata.cc
170171
puffin/json_serde.cc

src/iceberg/data/delete_loader.cc

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@
1919

2020
#include "iceberg/data/delete_loader.h"
2121

22+
#include <cstring>
23+
#include <span>
2224
#include <string>
2325
#include <vector>
2426

27+
#include <nanoarrow/nanoarrow.h>
28+
29+
#include "iceberg/arrow/nanoarrow_status_internal.h"
2530
#include "iceberg/arrow_c_data_guard_internal.h"
2631
#include "iceberg/deletes/position_delete_index.h"
32+
#include "iceberg/deletes/position_delete_range_consumer.h"
2733
#include "iceberg/file_reader.h"
2834
#include "iceberg/manifest/manifest_entry.h"
2935
#include "iceberg/metadata_columns.h"
36+
#include "iceberg/result.h"
3037
#include "iceberg/row/arrow_array_wrapper.h"
3138
#include "iceberg/schema.h"
3239
#include "iceberg/util/macros.h"
@@ -57,6 +64,24 @@ Result<std::unique_ptr<Reader>> OpenDeleteFile(const DataFile& file,
5764
return ReaderFactoryRegistry::Open(file.file_format, options);
5865
}
5966

67+
/// Raw `int64` values buffer (offset-adjusted). Skips the validity bitmap:
68+
/// `kDeleteFilePos` is required by the V2 spec.
69+
const int64_t* Int64ValuesBuffer(const ArrowArrayView* view) {
70+
return view->buffer_views[1].data.as_int64 + view->offset;
71+
}
72+
73+
/// String-equals at `row_idx` via nanoarrow's unsafe direct-buffer access.
74+
/// Skips the validity bitmap: `kDeleteFilePath` is required by the V2 spec.
75+
bool StringEquals(const ArrowArrayView* view, int64_t row_idx,
76+
std::string_view target) {
77+
ArrowStringView sv = ArrowArrayViewGetStringUnsafe(view, row_idx);
78+
if (static_cast<size_t>(sv.size_bytes) != target.size()) {
79+
return false;
80+
}
81+
return target.empty() ||
82+
std::memcmp(sv.data, target.data(), target.size()) == 0;
83+
}
84+
6085
} // namespace
6186

6287
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
@@ -71,30 +96,60 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
7196
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
7297
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
7398

99+
// Reused across batches; reads child buffers directly to avoid the
100+
// per-row `Scalar` dispatch in `ArrowArrayStructLike`.
101+
ArrowArrayView array_view;
102+
internal::ArrowArrayViewGuard view_guard(&array_view);
103+
ArrowError error;
104+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
105+
ArrowArrayViewInitFromSchema(&array_view, &arrow_schema, &error), error);
106+
107+
// Fast path when the writer's `referenced_data_file` hint matches our
108+
// target: skip the path column, hand `pos_data` straight to
109+
// `ForEachPositionDelete`. Trusts the hint -- spec-compliant writers
110+
// only set it when all rows share one data file.
111+
const bool use_referenced_data_file_fast_path =
112+
file.referenced_data_file.has_value() &&
113+
file.referenced_data_file.value() == data_file_path;
114+
115+
// Filter-path staging buffer; reused across batches via `clear()`.
116+
std::vector<int64_t> positions;
117+
74118
while (true) {
75119
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
76120
if (!batch_opt.has_value()) break;
77121

78122
auto& batch = batch_opt.value();
79123
internal::ArrowArrayGuard batch_guard(&batch);
80124

81-
ICEBERG_ASSIGN_OR_RAISE(
82-
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
125+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
126+
ArrowArrayViewSetArray(&array_view, &batch, &error), error);
83127

84-
for (int64_t i = 0; i < batch.length; ++i) {
85-
if (i > 0) {
86-
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
87-
}
88-
// Field 0: file_path
89-
ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
90-
auto path = std::get<std::string_view>(path_scalar);
91-
92-
if (path == data_file_path) {
93-
// Field 1: pos
94-
ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
95-
index.Delete(std::get<int64_t>(pos_scalar));
128+
const int64_t length = batch.length;
129+
if (length <= 0) {
130+
continue;
131+
}
132+
133+
// Child indices must match `PosDeleteSchema()`: 0 = file_path, 1 = pos.
134+
const ArrowArrayView* pos_view = array_view.children[1];
135+
const int64_t* pos_data = Int64ValuesBuffer(pos_view);
136+
137+
if (use_referenced_data_file_fast_path) {
138+
ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index);
139+
continue;
140+
}
141+
142+
const ArrowArrayView* path_view = array_view.children[0];
143+
positions.clear();
144+
if (positions.capacity() < static_cast<size_t>(length)) {
145+
positions.reserve(static_cast<size_t>(length));
146+
}
147+
for (int64_t i = 0; i < length; ++i) {
148+
if (StringEquals(path_view, i, data_file_path)) {
149+
positions.push_back(pos_data[i]);
96150
}
97151
}
152+
ForEachPositionDelete(positions, index);
98153
}
99154

100155
return reader->Close();

src/iceberg/deletes/position_delete_index.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex& other) {
3939
bitmap_.Or(other.bitmap_);
4040
}
4141

42+
void PositionDeleteIndex::BulkAddForKey(int32_t key, const uint32_t* positions,
43+
size_t n) {
44+
bitmap_.AddManyForKey(key, positions, n);
45+
}
46+
4247
} // namespace iceberg

src/iceberg/deletes/position_delete_index.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@
2424

2525
#include <cstdint>
2626
#include <memory>
27+
#include <span>
2728

2829
#include "iceberg/deletes/roaring_position_bitmap.h"
2930
#include "iceberg/iceberg_data_export.h"
3031

3132
namespace iceberg {
3233

33-
/// \brief Tracks deleted row positions using a bitmap.
34+
/// \brief Tracks deleted row positions for an Iceberg MOR data file.
35+
/// Positions are 0-based row indices.
3436
///
35-
/// This class provides a domain-specific API for position deletes
36-
/// in Iceberg MOR (merge-on-read) tables. Positions are 0-based
37-
/// row indices within a data file.
37+
/// \note Not thread-safe. Callers must externally serialize every access
38+
/// -- including read-only methods -- whenever any thread might mutate
39+
/// the instance. Distinct instances are independent.
3840
class ICEBERG_DATA_EXPORT PositionDeleteIndex {
3941
public:
4042
PositionDeleteIndex() = default;
@@ -65,6 +67,14 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex {
6567
void Merge(const PositionDeleteIndex& other);
6668

6769
private:
70+
// Bulk-add `n` positions sharing high-32-bit `key`. Private hook for
71+
// `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public
72+
// mutation surface.
73+
void BulkAddForKey(int32_t key, const uint32_t* positions, size_t n);
74+
75+
friend void ForEachPositionDelete(std::span<const int64_t> positions,
76+
PositionDeleteIndex& target);
77+
6878
RoaringPositionBitmap bitmap_;
6979
};
7080

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/deletes/position_delete_range_consumer.h"
21+
22+
#include <algorithm>
23+
#include <cstdint>
24+
#include <span>
25+
#include <vector>
26+
27+
#include "iceberg/deletes/position_delete_index.h"
28+
#include "iceberg/deletes/roaring_position_bitmap.h"
29+
30+
namespace iceberg {
31+
32+
namespace {
33+
34+
bool IsValidPosition(int64_t pos) {
35+
return pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition;
36+
}
37+
38+
// Unsigned subtraction so negative or wrap-around input can't
39+
// false-positive via signed overflow.
40+
bool IsAdjacent(int64_t prev, int64_t next) {
41+
return (static_cast<uint64_t>(next) - static_cast<uint64_t>(prev)) == 1;
42+
}
43+
44+
// `RoaringPositionBitmap` shards positions by their high 32 bits; the
45+
// bulk path groups by this key before flushing via `BulkAddForKey`.
46+
int32_t HighKeyFromPosition(int64_t pos) {
47+
return static_cast<int32_t>(pos >> 32);
48+
}
49+
50+
// Emit `[range_start, last_position]`, collapsing singletons. Callers
51+
// pre-filter via `IsValidPosition`, so `last_position + 1` cannot overflow.
52+
void EmitRange(PositionDeleteIndex& target, int64_t range_start,
53+
int64_t last_position) {
54+
if (range_start == last_position) {
55+
target.Delete(range_start);
56+
} else {
57+
target.Delete(range_start, last_position + 1);
58+
}
59+
}
60+
61+
// Emit closed-interval runs; out-of-range positions are silently skipped
62+
// to match `Delete(pos)`.
63+
void CoalesceIntoRanges(std::span<const int64_t> positions,
64+
PositionDeleteIndex& target) {
65+
const size_t n = positions.size();
66+
67+
size_t i = 0;
68+
while (i < n && !IsValidPosition(positions[i])) {
69+
++i;
70+
}
71+
if (i == n) {
72+
return;
73+
}
74+
75+
int64_t range_start = positions[i];
76+
int64_t last_position = range_start;
77+
++i;
78+
79+
for (; i < n; ++i) {
80+
const int64_t pos = positions[i];
81+
if (!IsValidPosition(pos)) {
82+
continue;
83+
}
84+
if (!IsAdjacent(last_position, pos)) {
85+
EmitRange(target, range_start, last_position);
86+
range_start = pos;
87+
}
88+
last_position = pos;
89+
}
90+
91+
EmitRange(target, range_start, last_position);
92+
}
93+
94+
} // namespace
95+
96+
void ForEachPositionDelete(std::span<const int64_t> positions,
97+
PositionDeleteIndex& target) {
98+
if (positions.empty()) {
99+
return;
100+
}
101+
102+
// Below this size the bulk path's fixed overhead exceeds coalescing
103+
// even on fully scattered input; skip the sniff.
104+
constexpr size_t kMinSniffSize = 64;
105+
if (positions.size() < kMinSniffSize) {
106+
CoalesceIntoRanges(positions, target);
107+
return;
108+
}
109+
110+
// Estimate boundary density (fraction of adjacent pairs where
111+
// `pos[i] != pos[i-1] + 1`) over a bounded prefix. Misclassification is
112+
// performance-only -- both paths produce identical contents.
113+
constexpr size_t kSniffSize = 1024;
114+
// 10% threshold: boundary-heavy inputs go to bulk addMany; run-heavy
115+
// inputs stay on coalesce where Roaring's addRange collapses runs.
116+
constexpr size_t kBulkThresholdPercent = 10;
117+
118+
const size_t sniff = std::min(positions.size(), kSniffSize);
119+
size_t boundaries = 0;
120+
for (size_t i = 1; i < sniff; ++i) {
121+
boundaries += static_cast<size_t>(!IsAdjacent(positions[i - 1], positions[i]));
122+
}
123+
124+
// boundaries / (sniff - 1) > kBulkThresholdPercent / 100, without FP.
125+
if (boundaries * 100 > (sniff - 1) * kBulkThresholdPercent) {
126+
// Bulk path: group by high-32-bit key, flush each group via CRoaring's
127+
// `addMany` (through `BulkAddForKey`). The thread-local buffer is
128+
// reused across calls; nested invocations on the same thread would
129+
// corrupt it -- see `\warning` on `ForEachPositionDelete`.
130+
thread_local std::vector<uint32_t> bulk_key_positions;
131+
const size_t n = positions.size();
132+
size_t i = 0;
133+
while (i < n) {
134+
while (i < n && !IsValidPosition(positions[i])) {
135+
++i;
136+
}
137+
if (i == n) {
138+
break;
139+
}
140+
const int32_t key = HighKeyFromPosition(positions[i]);
141+
bulk_key_positions.clear();
142+
while (i < n && IsValidPosition(positions[i]) &&
143+
HighKeyFromPosition(positions[i]) == key) {
144+
bulk_key_positions.push_back(
145+
static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
146+
++i;
147+
}
148+
target.BulkAddForKey(key, bulk_key_positions.data(),
149+
bulk_key_positions.size());
150+
}
151+
return;
152+
}
153+
154+
CoalesceIntoRanges(positions, target);
155+
}
156+
157+
} // namespace iceberg
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <cstdint>
23+
#include <span>
24+
25+
#include "iceberg/iceberg_data_export.h"
26+
27+
namespace iceberg {
28+
29+
class PositionDeleteIndex;
30+
31+
/// \brief Apply `positions` to `target` as deletes; semantically equivalent
32+
/// to calling `target.Delete(pos)` for each entry. Out-of-range positions
33+
/// are silently ignored. Sorted, mostly-contiguous input is fastest.
34+
/// Mirrors Java's `PositionDeleteRangeConsumer.forEach`.
35+
///
36+
/// \warning Not safe to call recursively or interleaved on the same thread:
37+
/// the bulk dispatch path uses a thread-local staging buffer that a
38+
/// nested invocation would corrupt. Concurrent calls on different
39+
/// threads are safe with disjoint `target` (see `PositionDeleteIndex`).
40+
void ICEBERG_DATA_EXPORT ForEachPositionDelete(std::span<const int64_t> positions,
41+
PositionDeleteIndex& target);
42+
43+
} // namespace iceberg

src/iceberg/deletes/roaring_position_bitmap.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ void RoaringPositionBitmap::Add(int64_t pos) {
105105
impl_->bitmaps[key].add(pos32);
106106
}
107107

108+
void RoaringPositionBitmap::AddManyForKey(int32_t key, const uint32_t* positions,
109+
size_t n) {
110+
impl_->AllocateBitmapsIfNeeded(key + 1);
111+
impl_->bitmaps[key].addMany(n, positions);
112+
}
113+
108114
void RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
109115
pos_start = std::max(pos_start, int64_t{0});
110116
pos_end = std::min(pos_end, kMaxPosition + 1);

0 commit comments

Comments
 (0)