Skip to content

Commit aecde8e

Browse files
committed
Using an unrolled linked list to halve memory usage
1 parent 187486f commit aecde8e

10 files changed

Lines changed: 617 additions & 178 deletions

File tree

src/lib.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::borrow::Cow;
2+
use std::sync::Arc;
23

34
mod block_read_write;
45

@@ -28,6 +29,56 @@ impl<'a> Record<'a> {
2829
}
2930
}
3031

32+
#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)]
33+
pub struct FileNumber {
34+
file_number: Arc<u64>,
35+
}
36+
37+
impl FileNumber {
38+
fn new(file_number: u64) -> Self {
39+
FileNumber {
40+
file_number: Arc::new(file_number),
41+
}
42+
}
43+
44+
/// Returns whether there is no clone of this FileNumber in existance.
45+
///
46+
/// /!\ care should be taken to not have some other code store a &FileNumber which could alias
47+
/// with self as it might then be sementically incorrect to delete content based only on this
48+
/// returning `true`.
49+
pub fn can_be_deleted(&self) -> bool {
50+
Arc::strong_count(&self.file_number) == 1
51+
}
52+
53+
#[cfg(test)]
54+
pub fn unroll(&self, tracker: &crate::rolling::FileTracker) -> Vec<u64> {
55+
let mut file = self.clone();
56+
let mut file_numbers = Vec::new();
57+
loop {
58+
file_numbers.push(file.file_number());
59+
if let Some(next_file) = tracker.next(&file) {
60+
file = next_file;
61+
} else {
62+
return file_numbers;
63+
}
64+
}
65+
}
66+
67+
pub fn filename(&self) -> String {
68+
format!("wal-{:020}", self.file_number)
69+
}
70+
71+
#[cfg(test)]
72+
pub fn file_number(&self) -> u64 {
73+
*self.file_number
74+
}
75+
76+
#[cfg(test)]
77+
pub fn for_test(file_number: u64) -> Self {
78+
FileNumber::new(file_number)
79+
}
80+
}
81+
3182
/// Resources used by mrecordlog
3283
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3384
pub struct ResourceUsage {

src/mem/arena.rs

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
use std::time::{Instant, Duration};
2+
3+
#[cfg(not(test))]
4+
pub const PAGE_SIZE: usize = 1 << 20;
5+
6+
#[cfg(test)]
7+
pub const PAGE_SIZE: usize = 7;
8+
9+
10+
// TODO make it an array once we get a way to allocate array on the heap.
11+
pub type Page = Box<[u8]>;
12+
13+
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
14+
pub struct PageId(usize);
15+
16+
/// An arena of fixed sized pages.
17+
#[derive(Default)]
18+
pub struct Arena {
19+
/// We use an array to store the list of pages.
20+
/// It can be seen as an efficient map from page id to pages.
21+
///
22+
/// This map's len can-only grows. Its size is therefore the maximum number of pages
23+
/// that was ever allocated. One page being 1MB long, this is not a problem.
24+
///
25+
/// If a page is not allocated, the corresponding entry is `None`.
26+
pages: Vec<Option<Page>>,
27+
/// `free_slots` slots keeps track of the pages that are not allocated.
28+
free_slots: Vec<PageId>,
29+
/// `free_page_ids` keeps track of the allocated pages that are
30+
/// available.
31+
free_page_ids: Vec<PageId>,
32+
/// Arena stats used to track how many pages should be freed.
33+
stats: ArenaStats,
34+
}
35+
36+
// The idea here is that we keep track of the maximum number of pages used through time.
37+
// To identify if it is worth deallocating pages, we look at the maximum number of pages
38+
// in use in the last few minutes minutes.
39+
//
40+
// We then allow ourselves to free memory down to this value.
41+
// Tracking exactly the maximum number of pages in use in the last 5 minutes is unnecessarily
42+
// complicated.
43+
//
44+
// For instance, we could run an extra task or thread.
45+
//
46+
// Instead, we just run a routine whenever someone interacts with the GC.
47+
// This routine only checks time 1 out of 256 calls.
48+
//
49+
// Pitfall: If pages are requests way less often than 256 times per minutes,
50+
// this arena may take way too much time to release its memory.
51+
struct ArenaStats {
52+
max_num_used_pages_former: usize,
53+
max_num_used_pages_current: usize,
54+
call_counter: u8,
55+
next_window_start: Instant,
56+
}
57+
58+
const WINDOW: Duration = Duration::from_secs(60);
59+
60+
impl Default for ArenaStats {
61+
fn default() -> ArenaStats {
62+
ArenaStats {
63+
// We arbitrarily initialize num used pages former to 100.
64+
max_num_used_pages_former: 100,
65+
max_num_used_pages_current: 0,
66+
call_counter: 0u8,
67+
next_window_start: Instant::now(),
68+
}
69+
}
70+
}
71+
72+
impl ArenaStats {
73+
/// This method happens when we are changing time window.
74+
fn roll(&mut self, now: Instant) {
75+
self.max_num_used_pages_former = self.max_num_used_pages_current;
76+
self.max_num_used_pages_current = 0;
77+
self.next_window_start = now + WINDOW;
78+
}
79+
80+
pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize {
81+
// The only function of the call counter is to avoid calling `Instant::now()`
82+
// at every single call.
83+
self.call_counter = self.call_counter.wrapping_add(1);
84+
if self.call_counter == 0u8 {
85+
let now = Instant::now();
86+
if now > self.next_window_start {
87+
self.roll(now);
88+
}
89+
}
90+
self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages);
91+
self.target_num_pages()
92+
}
93+
94+
// This method returns a target number of pages.
95+
//
96+
// If we currently have a number of allocated pages higher than this, we need to free
97+
// pages until we reach this number.
98+
fn target_num_pages(&self) -> usize {
99+
let max_over_both_windows = self.max_num_used_pages_former.max(self.max_num_used_pages_current);
100+
(max_over_both_windows + 10).max(max_over_both_windows * 105 / 100)
101+
}
102+
}
103+
104+
105+
impl Arena {
106+
/// Returns an allocated page id.
107+
pub fn get_page_id(&mut self) -> PageId {
108+
if let Some(page_id) = self.free_page_ids.pop() {
109+
assert!(self.pages[page_id.0].is_some());
110+
return page_id;
111+
}
112+
let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice();
113+
if let Some(free_slot) = self.free_slots.pop() {
114+
let slot = &mut self.pages[free_slot.0];
115+
assert!(slot.is_none());
116+
*slot = Some(page);
117+
return free_slot;
118+
} else {
119+
let new_page_id = self.pages.len();
120+
self.pages.push(Some(page));
121+
PageId(new_page_id)
122+
}
123+
}
124+
125+
#[inline]
126+
pub fn page(&self, page_id: PageId) -> &[u8] {
127+
self.pages[page_id.0].as_ref().unwrap()
128+
}
129+
130+
#[inline]
131+
pub fn page_mut(&mut self, page_id: PageId) -> &mut [u8] {
132+
self.pages[page_id.0].as_mut().unwrap()
133+
}
134+
135+
pub fn release_page(&mut self, page_id: PageId) {
136+
self.free_page_ids.push(page_id);
137+
assert!(self.pages[page_id.0].is_some());
138+
self.gc();
139+
}
140+
141+
/// `gc` releases memory by deallocating ALL of the free pages.
142+
pub fn gc(&mut self) {
143+
let num_used_pages = self.num_used_pages();
144+
let target_num_pages = self.stats.record_num_used_page(num_used_pages);
145+
assert!(target_num_pages >= num_used_pages);
146+
let num_pages_to_free = self.num_allocated_pages() - target_num_pages;
147+
assert!(num_pages_to_free >= self.free_page_ids.len());
148+
for _ in 0..num_pages_to_free {
149+
let page_id = self.free_page_ids.pop().unwrap();
150+
self.pages[page_id.0] = None;
151+
self.free_slots.push(page_id);
152+
}
153+
}
154+
155+
/// Returns the number of pages that are allocated
156+
/// (regardless of whether they are in use or not).
157+
pub fn num_allocated_pages(&self) -> usize {
158+
self.pages.len() - self.free_slots.len()
159+
}
160+
161+
/// Returns the number of pages that are allocated AND actually used.
162+
pub fn num_used_pages(&self) -> usize {
163+
self.pages.len() - self.free_slots.len() - self.free_page_ids.len()
164+
}
165+
166+
pub fn capacity(&self) -> usize {
167+
self.num_allocated_pages() * PAGE_SIZE
168+
}
169+
170+
pub fn unused_capacity(&self) -> usize {
171+
self.free_page_ids.len() * PAGE_SIZE
172+
}
173+
}
174+
175+
176+
#[cfg(test)]
177+
mod tests {
178+
use super::*;
179+
180+
#[test]
181+
fn test_arena_simple() {
182+
let mut arena = Arena::default();
183+
assert_eq!(arena.capacity(), 0);
184+
assert_eq!(arena.get_page_id(), PageId(0));
185+
assert_eq!(arena.get_page_id(), PageId(1));
186+
arena.release_page(PageId(0));
187+
assert_eq!(arena.get_page_id(), PageId(0));
188+
}
189+
190+
#[test]
191+
fn test_arena_gc() {
192+
let mut arena = Arena::default();
193+
assert_eq!(arena.capacity(), 0);
194+
assert_eq!(arena.get_page_id(), PageId(0));
195+
assert_eq!(arena.get_page_id(), PageId(1));
196+
arena.release_page(PageId(1));
197+
assert_eq!(arena.num_allocated_pages(), 2);
198+
arena.gc();
199+
assert_eq!(arena.num_allocated_pages(), 1);
200+
assert_eq!(arena.get_page_id(), PageId(1));
201+
assert_eq!(arena.num_allocated_pages(), 2);
202+
}
203+
}

src/mem/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
mod queue;
22
mod queues;
3+
mod arena;
4+
mod rolling_buffer;
35

6+
7+
use self::arena::{Arena, PAGE_SIZE};
8+
use self::rolling_buffer::RollingBuffer;
49
pub(crate) use self::queue::MemQueue;
510
pub(crate) use self::queues::MemQueues;
611

12+
713
#[cfg(test)]
814
mod tests;

0 commit comments

Comments
 (0)