Skip to content

Commit cb0a1e6

Browse files
richoxzhangli20
andauthored
use memory prefetch in hash map building (#571)
Co-authored-by: zhangli20 <zhangli20@kuaishou.com>
1 parent dce12db commit cb0a1e6

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl Table {
107107
let mut num_valid_items = 0;
108108

109109
// collect map items
110-
let mut map_items = vec![];
110+
let mut map_items = unchecked!(vec![]);
111111
for (hash, chunk) in hashes
112112
.into_iter()
113113
.enumerate()
@@ -149,18 +149,32 @@ impl Table {
149149
let mut map = unchecked!(Vec::with_capacity((1usize << map_mod_bits) + 16));
150150
map.resize(1 << map_mod_bits, MapValueGroup::default());
151151

152-
for (hash, item) in map_items {
153-
let mut i = (hash % (1 << map_mod_bits)) as usize;
152+
macro_rules! entries {
153+
[$i:expr] => (map_items[$i].0 % (1 << map_mod_bits))
154+
}
155+
156+
const PREFETCH_AHEAD: usize = 4;
157+
if map_items.len() >= PREFETCH_AHEAD {
158+
for i in 1..PREFETCH_AHEAD - 1 {
159+
prefetch_read_data!(&map[entries![i] as usize]);
160+
}
161+
}
162+
for i in 0..map_items.len() {
163+
if i + PREFETCH_AHEAD < map_items.len() {
164+
prefetch_read_data!(&map[entries![i + PREFETCH_AHEAD] as usize]);
165+
}
166+
167+
let mut e = entries![i] as usize;
154168
loop {
155-
let empty = map[i].hashes.simd_eq(Simd::splat(0));
169+
let empty = map[e].hashes.simd_eq(Simd::splat(0));
156170
if let Some(j) = empty.first_set() {
157-
map[i].hashes.as_mut_array()[j] = hash;
158-
map[i].values[j] = item;
171+
map[e].hashes.as_mut_array()[j] = map_items[i].0;
172+
map[e].values[j] = map_items[i].1;
159173
break;
160174
}
161175

162-
i += 1;
163-
if i == map.len() {
176+
e += 1;
177+
if e == map.len() {
164178
map.push(MapValueGroup::default());
165179
}
166180
}

0 commit comments

Comments
 (0)