Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/tree_store/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl UntypedBtreeMut {
// Applies visitor to all dirty leaf pages in the tree
pub(crate) fn dirty_leaf_visitor<F>(&mut self, visitor: F) -> Result
where
F: Fn(PageMut) -> Result,
F: for<'a> Fn(PageMut<'a>) -> Result,
{
if let Some(page_number) = self.root.map(|x| x.root) {
if !self.mem.uncommitted(page_number) {
Expand All @@ -234,7 +234,7 @@ impl UntypedBtreeMut {

fn dirty_leaf_visitor_helper<F>(&mut self, page_number: PageNumber, visitor: &F) -> Result
where
F: Fn(PageMut) -> Result,
F: for<'a> Fn(PageMut<'a>) -> Result,
{
assert!(self.mem.uncommitted(page_number));
let page = self.mem.get_page_mut(page_number)?;
Expand Down Expand Up @@ -528,12 +528,12 @@ impl<K: Key + 'static, V: Value + 'static> BtreeMut<'_, K, V> {
}
}

fn get_mut_helper(
&mut self,
parent: Option<(PageMut, usize)>,
mut page: PageMut,
fn get_mut_helper<'txn>(
&'txn mut self,
parent: Option<(PageMut<'txn>, usize)>,
mut page: PageMut<'txn>,
query: &[u8],
) -> Result<Option<AccessGuardMut<'_, V>>> {
) -> Result<Option<AccessGuardMut<'txn, V>>> {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
Expand Down
88 changes: 43 additions & 45 deletions src/tree_store/btree_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ enum OnDrop {
},
}

enum EitherPage {
enum EitherPage<'txn> {
Immutable(PageImpl),
Mutable(PageMut),
Mutable(PageMut<'txn>),
OwnedMemory(Vec<u8>),
ArcMemory(Arc<[u8]>),
}

impl EitherPage {
impl EitherPage<'_> {
fn memory(&self) -> &[u8] {
match self {
EitherPage::Immutable(page) => page.memory(),
Expand All @@ -164,24 +164,21 @@ impl EitherPage {
///
/// When this structure is dropped (goes out of scope), the data is released
pub struct AccessGuard<'a, V: Value + 'static> {
page: EitherPage,
page: EitherPage<'a>,
offset: usize,
len: usize,
on_drop: OnDrop,
_value_type: PhantomData<V>,
// Used so that logical references into a Table respect the appropriate lifetime
_lifetime: PhantomData<&'a ()>,
}

impl<V: Value + 'static> AccessGuard<'_, V> {
impl<'a, V: Value + 'static> AccessGuard<'a, V> {
pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
Self {
page: EitherPage::Immutable(page),
offset: range.start,
len: range.len(),
on_drop: OnDrop::None,
_value_type: Default::default(),
_lifetime: Default::default(),
}
}

Expand All @@ -192,7 +189,6 @@ impl<V: Value + 'static> AccessGuard<'_, V> {
len: range.len(),
on_drop: OnDrop::None,
_value_type: Default::default(),
_lifetime: Default::default(),
}
}

Expand All @@ -204,12 +200,11 @@ impl<V: Value + 'static> AccessGuard<'_, V> {
len,
on_drop: OnDrop::None,
_value_type: Default::default(),
_lifetime: Default::default(),
}
}

pub(super) fn remove_on_drop(
page: PageMut,
page: PageMut<'a>,
offset: usize,
len: usize,
position: usize,
Expand All @@ -224,7 +219,6 @@ impl<V: Value + 'static> AccessGuard<'_, V> {
fixed_key_size,
},
_value_type: Default::default(),
_lifetime: Default::default(),
}
}

Expand Down Expand Up @@ -255,12 +249,11 @@ impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
}

pub struct AccessGuardMut<'a, V: Value + 'static> {
value: Vec<u8>,
page: PageNumber,
page: PageMut<'a>,
offset: usize,
len: usize,
entry_index: usize,
parent: Option<(PageNumber, usize)>,
parent: Option<(PageMut<'a>, usize)>,
mem: Arc<TransactionalMemory>,
allocated: Arc<Mutex<PageTrackerPolicy>>,
root_ref: &'a mut BtreeHeader,
Expand All @@ -271,11 +264,11 @@ pub struct AccessGuardMut<'a, V: Value + 'static> {
impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
page: PageMut,
page: PageMut<'a>,
offset: usize,
len: usize,
entry_index: usize,
parent: Option<(PageMut, usize)>,
parent: Option<(PageMut<'a>, usize)>,
mem: Arc<TransactionalMemory>,
allocated: Arc<Mutex<PageTrackerPolicy>>,
root_ref: &'a mut BtreeHeader,
Expand All @@ -285,14 +278,12 @@ impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
if let Some((ref parent_page, _)) = parent {
assert!(mem.uncommitted(parent_page.get_page_number()));
}
let value = page.memory()[offset..(offset + len)].to_vec();
AccessGuardMut {
value,
page: page.get_page_number(),
page,
offset,
len,
entry_index,
parent: parent.map(|(p, i)| (p.get_page_number(), i)),
parent,
mem,
allocated,
root_ref,
Expand All @@ -303,35 +294,33 @@ impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {

/// Access the stored value
pub fn value(&self) -> V::SelfType<'_> {
V::from_bytes(&self.value)
V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
}

/// Replace the stored value
pub fn insert<'v>(&mut self, value: impl Borrow<V::SelfType<'v>>) -> Result<()> {
let value_bytes = V::as_bytes(value.borrow());
self.value = value_bytes.as_ref().to_vec();

let mut page = self.mem.get_page_mut(self.page)?;

// TODO: optimize this to avoid copying the key
let key_bytes = {
let accessor = LeafAccessor::new(page.memory(), self.key_width, V::fixed_width());
let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
accessor.key_unchecked(self.entry_index).to_vec()
};

if LeafMutator::sufficient_insert_inplace_space(
&page,
&self.page,
self.entry_index,
true,
self.key_width,
V::fixed_width(),
key_bytes.as_slice(),
value_bytes.as_ref(),
) {
let mut mutator = LeafMutator::new(page.memory_mut(), self.key_width, V::fixed_width());
let mut mutator =
LeafMutator::new(self.page.memory_mut(), self.key_width, V::fixed_width());
mutator.insert(self.entry_index, true, &key_bytes, value_bytes.as_ref());
} else {
let accessor = LeafAccessor::new(page.memory(), self.key_width, V::fixed_width());
let accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
let mut builder = LeafBuilder::new(
&self.mem,
&self.allocated,
Expand All @@ -352,18 +341,16 @@ impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
let new_page = builder.build()?;

// Update parent branch page if it exists, otherwise update root
if let Some((ref mut parent_page_number, parent_entry_index)) = self.parent {
let mut parent_page = self.mem.get_page_mut(*parent_page_number)?;
if let Some((ref mut parent_page, parent_entry_index)) = self.parent {
let mut mutator = BranchMutator::new(parent_page.memory_mut());
mutator.write_child_page(parent_entry_index, new_page.get_page_number(), DEFERRED);
} else {
self.root_ref.root = new_page.get_page_number();
self.root_ref.checksum = DEFERRED;
}

let old_page_number = page.get_page_number();
self.page = new_page.get_page_number();
page = new_page;
let old_page_number = self.page.get_page_number();
self.page = new_page;
let mut allocated = self.allocated.lock().unwrap();
assert!(
self.mem
Expand All @@ -372,7 +359,7 @@ impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
}

// Update our page reference to the new page and recalculate offset/length
let new_accessor = LeafAccessor::new(page.memory(), self.key_width, V::fixed_width());
let new_accessor = LeafAccessor::new(self.page.memory(), self.key_width, V::fixed_width());
let (new_start, new_end) = new_accessor.value_range(self.entry_index).unwrap();

self.offset = new_start;
Expand All @@ -382,23 +369,27 @@ impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
}
}

impl<V: Value + 'static> Drop for AccessGuardMut<'_, V> {
fn drop(&mut self) {
// no-op. This Drop impl is only here to ensure that self is dropped before the transaction
// is committed. (i.e. that the lifetime 'a is shorter than the transaction)
}
}

pub struct AccessGuardMutInPlace<'a, V: Value + 'static> {
page: PageMut,
page: PageMut<'a>,
offset: usize,
len: usize,
_value_type: PhantomData<V>,
// Used so that logical references into a Table respect the appropriate lifetime
_lifetime: PhantomData<&'a ()>,
}

impl<V: Value + 'static> AccessGuardMutInPlace<'_, V> {
pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
impl<'a, V: Value + 'static> AccessGuardMutInPlace<'a, V> {
pub(crate) fn new(page: PageMut<'a>, offset: usize, len: usize) -> Self {
AccessGuardMutInPlace {
page,
offset,
len,
_value_type: Default::default(),
_lifetime: Default::default(),
}
}
}
Expand All @@ -409,6 +400,13 @@ impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMutInPla
}
}

impl<V: Value + 'static> Drop for AccessGuardMutInPlace<'_, V> {
fn drop(&mut self) {
// no-op. This Drop impl is only here to ensure that self is dropped before the transaction
// is committed. (i.e. that the lifetime 'a is shorter than the transaction)
}
}

// Provides a simple zero-copy way to access entries
pub struct EntryAccessor<'a> {
key: &'a [u8],
Expand Down Expand Up @@ -696,7 +694,7 @@ impl<'a, 'b> LeafBuilder<'a, 'b> {
required_size > self.mem.get_page_size() && self.pairs.len() > 1
}

pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
pub(super) fn build_split<'txn>(self) -> Result<(PageMut<'txn>, &'a [u8], PageMut<'txn>)> {
let total_size = self.total_key_bytes + self.total_value_bytes;
let mut division = 0;
let mut first_split_key_bytes = 0;
Expand Down Expand Up @@ -748,7 +746,7 @@ impl<'a, 'b> LeafBuilder<'a, 'b> {
Ok((page1, self.pairs[division - 1].0, page2))
}

pub(super) fn build(self) -> Result<PageMut> {
pub(super) fn build<'txn>(self) -> Result<PageMut<'txn>> {
let required_size = self.required_bytes(
self.pairs.len(),
self.total_key_bytes + self.total_value_bytes,
Expand Down Expand Up @@ -1444,7 +1442,7 @@ impl<'a, 'b> BranchBuilder<'a, 'b> {
}
}

pub(super) fn build(self) -> Result<PageMut> {
pub(super) fn build<'txn>(self) -> Result<PageMut<'txn>> {
assert_eq!(self.children.len(), self.keys.len() + 1);
let size = RawBranchBuilder::required_bytes(
self.keys.len(),
Expand Down Expand Up @@ -1474,7 +1472,7 @@ impl<'a, 'b> BranchBuilder<'a, 'b> {
size > self.mem.get_page_size() && self.keys.len() >= 3
}

pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
pub(super) fn build_split<'txn>(self) -> Result<(PageMut<'txn>, &'a [u8], PageMut<'txn>)> {
let mut allocated_pages = self.allocated_pages.lock().unwrap();
assert_eq!(self.children.len(), self.keys.len() + 1);
assert!(self.keys.len() >= 3);
Expand Down
14 changes: 9 additions & 5 deletions src/tree_store/page_store/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -239,20 +240,23 @@ impl Clone for PageImpl {
}
}

pub(crate) struct PageMut {
// The lifetime should be bound to the lifetime of the transaction in which this page was opened.
// It is used in the Drop impl to ensure that the page is dropped before the transaction is committed.
pub(crate) struct PageMut<'txn> {
pub(super) mem: WritablePage,
pub(super) page_number: PageNumber,
pub(super) _lifetime: PhantomData<&'txn ()>,
#[cfg(debug_assertions)]
pub(super) open_pages: Arc<Mutex<HashSet<PageNumber>>>,
}

impl PageMut {
impl PageMut<'_> {
pub(crate) fn memory_mut(&mut self) -> &mut [u8] {
self.mem.mem_mut()
}
}

impl Page for PageMut {
impl Page for PageMut<'_> {
fn memory(&self) -> &[u8] {
self.mem.mem()
}
Expand All @@ -262,9 +266,9 @@ impl Page for PageMut {
}
}

#[cfg(debug_assertions)]
impl Drop for PageMut {
impl Drop for PageMut<'_> {
fn drop(&mut self) {
#[cfg(debug_assertions)]
assert!(self.open_pages.lock().unwrap().remove(&self.page_number));
}
}
Expand Down
Loading
Loading