leveldb memdb源码分析(下)之Rust实现篇

前言
leveldb中memdb模块使用skiplist作为一个kv的内存存储,相关代码实现非常漂亮。在上文介绍了下面内容:

  • 对比c++和golang版本中查询、插入、删除的实现
  • 分析golang版本中可以优化的地方,然后对rust版本进行优化
然后在本文中将会介绍
  • 如何参考goleveldb的版本使用rust重写memdb(arena版本)
  • 使用rust重写一个非arena版本的memdb,也就是经典的链表结构实现方式
arena实现
参考 goleveldb DB 的代码,同时考虑到并发安全,所以在Rust实现中,分别定义了db 和Db 两个结构
  • db包含所有的成员,非线程安全,提供查询相关方法;
  • Db 对db添加Mutex封装,线程安全,提供核心的插入,删除功能以及更多的查询功能;
具体如下:
db
https://github.com/kingeaster...
struct db { cmp: T,// 比较器,用于比较key // rnd:rand // 存储实际key,value的数据 kv_data: Vec, // 用于记录key,value在kv_data中的索引 ,每一个节点的格式如下 ,其中 level 表示当前节点的层数,后跟 level 个数字,分别表示当前节点的level个层中每一层的下一个节点在node_data中的位置 // kvOffset1|len(key1)|len(value1)|level|next_node_1|...|next_node_i|...|next_node_h| node_data: Vec, // 前面16个字节对应于 head 节点 // 在查询过程中,记录搜索过程中所经历的节点(实际是节点在node_data的位置),主要用于插入和删除 prev_node: RefCell, // 当前skiplist的层数 max_height: usize, // skiplist中的节点个数 n: usize, // 数据总大小 kv_size: usize, }

这个db定义和goleveldb 定义的是非常类似的,没有太多复杂的地方。
不过需要注意的是这里db中的prev_node成员变量,用于在查询或删除过程中记录每一层的前向节点,在golveldb中是一个普通的数组prevNode ,在我们的Rust定义中是一个用RefCell封装的数组 RefCell ,原因在于db的搜索方法有两种使用场景,一种是用于纯粹的搜索查询,那么当前db就是只读的,使用不变借用 &self,如果用于插入或删除,需要往RefCell中插入数据,那么db就变成可变了,需要使用可变借用 &mut self,为了让db保持 不变借用语义,所以使用RefCell来提供内部可变特性。那么为什么要让db保持不变借用,直接不管 纯查询或修改查询都使用可变借用不就行了吗?
因为Rust中不变借用是可以共享的,而可变借用是不可以共享的,如果直接只用可变借用&mut self的话,就会限制纯 查询操作的使用场景,即使一个操作只是查询,也要将db声明为mut。
另外一种方法就是prev_node 不作为db的成员变量,而是在查询的时候作为一个额外的函数入参,具体可以参考 节点版本的做法
封装next节点的读取和设置
github 地址
为了提高代码的可读性和可维护性,将 获取node节点在level层的下一个节点在node_data中的位置的操作和 设置下一个节点的操作进行封装:
// 计算node节点在level层的下一个节点在node_data中的位置 ,封装一下,提高代码可读性 fn next_node(&self, node: usize, i: usize) -> usize { // + NNEXT 表示在node_data 中,从node位置开始,要跳过kvOffset1|len(key1)|len(value1)|level| 这4个字节,后面再移动 i 个位置,就到达 next_node_i 了 self.node_data[node + NNEXT + i] }fn set_next_node(&mut self, node: usize, i: usize, next: usize) { self.node_data[node + NNEXT + i] = next; }

封装读取key或value的数据
github 地址
为了提供代码的可行性和可维护性,将 获取key的真实数据 和获取value的真实数的操作封装
// 根据 node 在 node_data中的位置,求出在kv_data 中的偏移量和长度,从而得到 key
fn get_key_data(&self, node: usize) -> &[u8] { let offset = self.node_data[node]; &self.kv_data[offset..offset + self.node_data[node + NKEY]] }

// 根据 node 在 node_data 中的位置,求出在kv_data 中的偏移量和长度,从而得到 value fn get_value_data(&self, node: usize) -> &[u8] { let key_offset = self.node_data[node] + self.node_data[node + NKEY]; &self.kv_data[key_offset..key_offset + self.node_data[node + NVAL]] }

查询大于等于特定key的节点
github 地址
可以看到经过封装,find_great_or_equal的实现方式与skiplist的算法描述更加贴合。
// save_pre 标记 在搜索过程中是否要记录遍历过的节点 pub fn find_great_or_equal(&self, key: &internalKey, save_pre: bool) -> (usize, bool) { let mut node = 0; // 从高层到底层开始搜索 let mut i = self.max_height - 1; // println!("max_height {}", i); loop { // 下个节点在 node_data 中的位置 let next = self.next_node(node, i); let mut cmp = Ordering::Greater; // 当前链表上没有走到尾 if next > 0 { // 和下个节点next进行key比较 cmp = self.cmp.compare(self.get_key_data(next), key.data()); }// 大于下一个节点,继续沿着当前层 向右 跳 if cmp == Ordering::Less { node = next; } else { // 小于等于下一个节点 或 下一个节点是空 // if save_pre { //// 对于插入或删除而进行的搜索,即使遇到相同的也要继续往下一层比较,不能立即返回 //// 所以这里要先于 cmp == Ordering::Equal 的判断 //self.prev_node.borrow_mut()[i] = node; // } else if cmp == Ordering::Equal { //// find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next //return (next, true); // }// 改成下面的方式可读性更高 if (!save_pre) && cmp == Ordering::Equal { return (next, true); } if save_pre {// 如果需要保持前向节点,记录到pre_node中 self.prev_node.borrow_mut()[i] = node; }if i == 0 { return (next, cmp == Ordering::Equal); }i -= 1; } } }

在上面实现中,对于当前节点小于等于下一个节点的处理,相比golang的写法进行了重构。参考golang的写法如下:
// 小于等于下一个节点 或 下一个节点是空 if save_pre { // 对于插入或删除而进行的搜索,即使遇到相同的也要继续往下一层比较,不能立即返回 // 所以这里要先于 cmp == Ordering::Equal 的判断 self.prev_node.borrow_mut()[i] = node; } else if cmp == Ordering::Equal { // find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next return (next, true); }

这里代码的主要含义是:如果只是纯粹的查询操作的话,找到匹配的就可以直接返回了;但是如果是为了插入或删除而进行的查询,即使找到了匹配的节点也要往下一层跳,直到最下面的一层才可以返回。理解了代码的意思我们进行重写
// 改成下面的方式可读性更高
if (!save_pre) && cmp == Ordering::Equal { return (next, true); } if save_pre { self.prev_node.borrow_mut()[i] = node; }

其它
find_lessthan, find_last 这两个method的Rust实现跟goleveldb一致,就不多讲,大家可以直接点击去看源码。
Db
github Db
pub struct Db { db: sync::RwLock>, }

db主要用来提供搜索方法,非线程安全的, Db执行插入和删除,线程安全。
插入put
github 插入
首先获取写锁,在Rust中,sync::RwLock通过调用write() 方法获取写锁:
let mut db = self.db.write().unwrap();
搜索key以获取插入的位置
let (node, exist) = db.find_great_or_equal(key, true);

如果key已经存在,就可以重复使用之前node_data
let (node, exist) = db.find_great_or_equal(key, true); if exist { // TODO 优化,如果新value的长度小于等于旧的value,直接覆盖直接的,不用重新分配 // 如果key已经存在,直接覆盖之前的value // 数据是追加的方式,所以当前kv_data的长度就是node节点的在 skv_data 上的新的偏移量 let offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); } // 更新node的偏移量 db.node_data[node] = offset; // value 的长度可能也变化了 // 之前的长度 let v_old = db.node_data[node + NVAL]; // 更新为新的长度 db.node_data[node + NVAL] = v_len; // 更新数据总大小 db.kv_size += v_len - v_old; return Ok(()); }

对于新插入的点,先计算分配的层高
let h = db.rand_heigth();
同样如果分配的层高比当前skiplist的最大层高还要高,就要给pre_node补偿缺失的前向节点,也就是把head节点补充进去。
// 处理head节点 if h > db.max_height { for i in db.max_height..h + 1 { db.prev_node.borrow_mut()[i] = 0; } db.max_height = h; println!("height {}", h); }

得到新节点数据在kv_data中的起始偏移,然后将key,value数据追加到kv_data后面.
// 新增节点在 kv_data 中的起始偏移 let kv_offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); }

记录当前node_data的长度,也就是新节点在node_data中的起始偏移,然后在node_data中追加新节点,
// 创建新节点,因为是追加方式,所以当前 node_data 的长度 就是新节点在 node_data 的位置 let node = db.node_data.len(); // 添加新节点 db.node_data.push(kv_offset); // 在kv_data中的偏移 db.node_data.push(key.data().len()); // key的长度 db.node_data.push(v_len); // value的长度 db.node_data.push(h); // 当前节点的层高

随后按照goleveldb的写法执行链表插入,goleveldb中是这么写的
// 遍历每层的前向节点 for i, n := range p.prevNode[:h] { m := n + nNext + i // n节点在i层的下一个节点 p.nodeData = https://www.it610.com/article/append(p.nodeData, p.nodeData[m]) // 当前节点第n层的下一个节点指向m p.nodeData[m] = node // n节点在i层的下一个节点指向当前节点node }

遍历prev_node中保存的前向节点,然后执行插入,也就是将当前节点指向前向节点的下一个节点,然后前向节点的下一个节点指向当前节点:
// 遍历每层的前向节点,iter()只会返回Item,利用enumerate封装可以同时返回下标 for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){ let next = db.next_node(*n, i); db.node_data.push(next); db.set_next_node(*n, i, node); }

编译器报错:
error[E0502]: cannot borrow `db` as mutable because it is also borrowed as immutable --> src/memdb/memdb.rs:343:13 | 341 |for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){ |--------------------- || |immutable borrow occurs here |a temporary with access to the immutable borrow is created here ... 342 |let next = db.next_node(*n, i); 343 |db.node_data.push(next); |^^ mutable borrow occurs here 344 | 345 |}

在for循环的作用域中,同时存在对db的不可变借用和可变借用,那么只能将两者分开;另外为了提高代码可读性,先提前给node_data扩展h长度,用于存储h个下个节点:
db.node_data.resize(node + NNEXT + h, 0);

执行节点插入的代码改写为
// 遍历每一层 for i in 0..h { let n = db.prev_node.borrow()[i]; // 前向节点 let next = db.next_node(n, i); // 前向节点在第i层的下一节点 db.set_next_node(node, i, next); // 当前节点第i层的下一个节点指向next db.set_next_node(n, i, node); // 前向节点在第i层的下一个节点指向当前节点node }

更新统计信息
// 更新数据大小和个数 db.kv_size += key.data().len() + v_len; db.n += 1;

删除
github delete
// 删除 pub fn delete(&mut self, key: &internalKey) -> Option<()> { let mut db = self.db.write().unwrap(); let (node, exist) = db.find_great_or_equal(key, true); if !exist { return None; }// 当前节点有几层 let h = db.node_data[node + NHEIGHT]; // 开始删除, 让前一个节点指向前一个节点的下一个节点的下一个节点 pre->next = pre->next->next for i in 0..h { let pre = db.prev_node.borrow()[i]; // let pre_next = db.next_node(pre, i); // if pre_next != node { //print!("{}:{}", pre_next, node); // }// let next_next = db.next_node(pre_next, i); // db.set_next_node(pre, i, next_next); // 由于 前一个节点的下一个节点 pre_node 就是当前节点 node ,所以上面代码可以优化为 let next_next = db.next_node(node, i); db.set_next_node(pre, i, next_next); }db.kv_size -= db.node_data[node + NKEY] + db.node_data[node + NVAL]; db.n -= 1; Some(()) }

delete的代码中,跟put类似,在遍历pre_node的时候,要通过下标进行访问,获取每一层的前向节点
let pre = db.prev_node.borrow()[i];
另外由于 前一个节点的下一个节点 pre_node 就是当前节点 node ,所以将节点删除的代码从
// 前向节点的下一个节点 let pre_next = db.next_node(pre, i); let next_next = db.next_node(pre_next, i); db.set_next_node(pre, i, next_next);

改为
// 直接使用当前节点 let next_next = db.next_node(node, i); db.set_next_node(pre, i, next_next);

其它
其它的方法比较简单,大家直接看源码即可
节点指针实现
节点定义
node,RcNode 如下:
type RcNode = Rc>; // 每一个节点 struct node { offset: usize,// 对应kv_data 中的起始位置 key_len: usize,// key的长度 value_len: usize,// value的长度 next: Vec>, // 当前节点有 height 层, 第i元素表示第i层的下一个节点 }

由于链表中节点要被其它节点引用,要共享所有权,所以要使用Rc,另外由于在操作中要更改next,所以使用RefCell提供内部可变性,type RcNode = Rc>;
node的next属性中,每一层的下一个节点可能是空,所以使用Option;
为node实现如下获取实际key,value数据的方法,这里由于返回的是&[u8],所以要使用声明周期`a;
// 根据 node 在 node_data中的位置,求出在kv_data 中的偏移量和长度,从而得到 key fn get_key_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] { &kv_data[node.offset..node.offset + node.key_len] }// 根据 node 在 node_data 中的位置,求出在kv_data 中的偏移量和长度,从而得到 value fn get_value_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] { &kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len] }

db_skip
github db_skip
db_skip的定义如下:
struct db_skip { cmp: T, // 比较器,用于比较key kv_data: Vec, // 存储实际key,value的数据 ,offset从1开始,offset为0的表示head节点 head: RcNode, // 头部, // 当前skiplist的层数 max_height: usize, // skiplist中的节点个数 n: usize, // 数据总大小 kv_size: usize, }

比较于agena版本,这里少了pre_node,多了一个head成员用于保存skiplist的首节点。
封装读取key或value的数据
// 根据 node 在 node_data中的位置,求出在kv_data 中的偏移量和长度,从而得到 key fn get_key_data(&self, node: &node) -> &[u8] { &self.kv_data[node.offset..node.offset + node.key_len] }// 根据 node 在 node_data 中的位置,求出在kv_data 中的偏移量和长度,从而得到 value fn get_value_data(&self, node: &node) -> &[u8] { &self.kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len] }

查询大于等于特定key的节点
github find_great_or_equal
pub fn find_great_or_equal( &self, key: &internalKey, pre_node: &mut Option>, // 注意 不能用 Option<&mut Vec> ) -> (RcNode, bool) { let mut node = Rc::clone(&self.head); // 从头节点开始 let mut next_node = Rc::clone(&node); let mut i = self.max_height - 1; loop { // 这里将 cmp 预先设置为 Ordering::Less 是一个非常巧妙的方式, 就可以自动包含 下个节点为空(当作是无穷大)的情况了 let mut cmp = Ordering::Less; if let Some(ref next) = node.borrow().next[i] { // 下一个节点存在 cmp = self .cmp .compare(key.data(), self.get_key_data(&node.borrow())); next_node = Rc::clone(&next); }// 大于下一个节点,继续沿着当前层 向右 跳 if cmp == Ordering::Greater { node = Rc::clone(&next_node); continue; }// 走到这里,说明: node 小于等于下一个节点 或 下一个节点是空// 如果不保存前向节点,只是普通的搜索,找到匹配就直接返回 if (pre_node.is_none()) && cmp == Ordering::Equal { return (next_node, true); }// 如果保存前向节点node if let Some(ref mut pre) = pre_node { pre.push(Rc::clone(&node)); }if i == 0 { return (next_node, cmp == Ordering::Equal); }i -= 1; } }

首先把pre_node作为入参传入find_great_or_equal , pre_node: &mut Option> 因为pre_node要存储每一层的前向节点,是可变的所以&mut ,另外由于pre_node是可选的,所以使用Option。这里注意不能写成 pre_node: Option<&mut Vec>,这样在从Option中提取Vec的时候就会造成pre_node的move,导致编译失败。
接下来使用node.borrow()获取当前节点的不变借用,然后通过next[i]获取第i层的下一节点,由于是Option类型,通过 if let Some(ref next)来在next节点存在的情况下获取next节点的引用。
// 如果下一个节点存储 if let Some(ref next) = node.borrow().next[i] { // 下一个节点存在,和下个节点进行比较 cmp = self.cmp.compare(key.data(), self.get_key_data(&next.borrow())); next_node = Rc::clone(&next); }

另外注意由于next的声明周期只在 if let {}内,所以要通过 next_node = Rc::clone(&next); 记录下来用于下一步迭代。
根据比较结果,如果大于下一节点,就沿着当前层跳到下一节点
// 大于下一个节点,继续沿着当前层 向右 跳 if cmp == Ordering::Greater { node = Rc::clone(&next_node); continue; }

接下来同理,优先判断如果不保存前向节点且找到匹配节点的情况:
// 走到这里,说明: node 小于等于下一个节点 或 下一个节点是空// 如果不保存前向节点,只是普通的搜索,找到匹配就直接返回 if (pre_node.is_none()) && cmp == Ordering::Equal { return (next_node, true); }

如果保存前向节点,利用 if let Some(ref mut pre)=pre_node 从pre_node获取Vec的可变借用,然后将 node 的共享借用放入:
// 如果保存前向节点node if let Some(ref mut pre) = pre_node { pre.push(Rc::clone(&node)); }

最后就是如果到底层了,就返回,没有到底层,就跳过下一层:
// 如果到了最后一层,就返回 if i == 0 { return (next_node, cmp == Ordering::Equal); }i -= 1;

其它
其它部分比较简单,直接看源码就可以了。
DBSkip
pub struct DBSkip { db: sync::RwLock>, }

put
key已经存在的处理逻辑如下,跟arena版本处理逻辑差不多,差别就在于更新当前节点node时候,使用borrow_mut()获取当前节点的可变借用进行修改:
// 如果key已经存在,直接覆盖之前的value // 数据是追加的方式,所以当前kv_data的长度就是node节点的在 skv_data 上的新的偏移量 let offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); } // 更新node的偏移量 node.borrow_mut().offset = offset; // value 的长度可能也变化了 // 之前的长度 let v_old = node.borrow().value_len; // 更新为新的长度 node.borrow_mut().value_len = v_len; // 更新数据总大小 db.kv_size += v_len - v_old; return Ok(());

如果是新增的key,先分配层高,根据层高处理pre_node
let mut pre_node = pre_node.unwrap(); let h = db.rand_heigth(); // 处理head节点 if h > db.max_height { // 补充 高出的部分 for i in db.max_height..h{ pre_node.push(Rc::clone(&db.head)); } db.max_height = h; println!("height {}", h); }

保存新节点数据在kv_data中的起始偏移 ,然后给kv_data追加key,value数据。
// 新增节点在 kv_data 中的起始偏移 let kv_offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); }

Vec的append方法pub fn append(&mut self, other: &mut Self) 对应说明 Moves all the elements of other into Self, leaving other empty.
创建新节点
let node = Rc::new(RefCell::new(node::new( kv_offset, key.data().len(), v_len, h, )));

对查询经过的每一层链表执行插入:
// 执行插入 for (i, pre) in pre_node.iter().enumerate() { // 新节点->next=pre->next if let Some(ref pre_next) = pre.borrow().next[i] { node.borrow_mut().next[i] = Some(Rc::clone(pre_next)); }// pre->next = 新节点 pre.borrow_mut().next[i] = Some(Rc::clone(&node)); }

更新统计数据
db.kv_size+=key.data().len()+v_len; db.n+=1; Ok(())

删除
看懂put方法,delete方法就简单许多
pub fn delete(&mut self, key: &internalKey) -> Option<()> { let mut db = self.db.write().unwrap(); let mut pre_node = Some(vec![]); let (node, exist) = db.find_great_or_equal(key, &mut pre_node); if !exist{ return None; } let pre_node = pre_node.unwrap(); // 执行删除 for (i, pre) in pre_node.iter().enumerate() { // 前向节点执行当前节点的下一个节点 pre->next = node->next if let Some(ref node_next) = node.borrow().next[i] { // node_next: 当前节点在第i层的下一跳节点 pre.borrow_mut().next[i] = Some(Rc::clone(node_next)); } }db.kv_size-=node.borrow().key_len+node.borrow().value_len; db.n -=1; Some(()) }

参考资料
跳跃表 https://www.bookstack.cn/read...
跳跃链表 https://www.cnblogs.com/s-lis...
level-rs完整项目地址 https://github.com/kingeaster...
【leveldb memdb源码分析(下)之Rust实现篇】leveldb memdb源码分析(下)之Rust实现篇
文章图片

    推荐阅读