CMU15-445 (2022fall) project实现笔记
数据库系统实现技术大作业
这是CMU 15-445 2022fall 的配套项目 bustub 2022fall,也是《数据库系统实现技术》这门选修课的大作业。这里简单记录一下作业的思路。
Project #0 - C++ Primer
这个项目是做一个简单的Trie树,不属于bustub的主体部分,实现起来也很简单。核心数据结构就是TrieNode, 表示树中的一个顶点,is_end_成员表示该单词是否结束。整体没啥难的。
class TrieNode{
protected:
char key_char_;
/** whether this node marks the end of a key */
bool is_end_{false};
/** A map of all child nodes of this trie node, which can be accessed by each
* child node's key char. */
std::unordered_map<char, std::unique_ptr<TrieNode>> children_;
}
主要是熟悉C++以及bustub的编码风格,以及一些工具的使用,包括用CMake\Make构建项目、用Clang-tidy来优化编码风格、用GDB来debug、用第三方库GoogleTest来进行单例测试等。
Project #1 - Buffer Pool
从这个部分开始,就进入Bustub的主体部分了。目标就是实现一个Buffer Pool Manager, 分成了3个小任务。
Task #1 Extendible Hash Table
这一部分是实现一个可扩展哈希表, 所谓可扩展,就是避免传统哈希表的rehash的开销(rehash就是传统的不可扩展的哈希表的加载因子比较高的时候,会进行扩容+rehash来减少冲突,开销很大)。
核心思想是在冲突较大的时候,即bucket满的时候,对bucket进行分裂, 且分裂时只涉及哈希表中相关指针(指向该桶的元素)进行修改,而不需要对其他元素的进行重排。另外,当local_depth (将要)大于 global_depth时需要对哈希表进行扩容,但是这个扩容不需要rehash, 只是指针的简单复制而已。
不得不说,这是一个非常巧妙的数据结构。
为了保证并发安全,我设计了以下三种的锁来保证并发安全,都是shared_mutex,即读写锁,提高并发性能。
class ExtendibleHashTable : public HashTable<K, V> {
mutable std::shared_mutex global_depth_latch_; // 保护global_depth_
mutable std::shared_mutex dir_latch_; // 保护哈希表vector
class Bucket {
std::shared_mutex bucket_latch_; // 桶级别的锁,保护桶里面成员变量
}
}
Task #2 LRU-K Replacement Policy
这一部分是实现一个LRU-K replacer, 与LRU相同点和不同点在于:当一个page被访问了k次以下时,表现和LRU一致;而当一个page被访问了k次及以上后,就会根据第k次前的时间戳进行淘汰。
在设计时,我采用了HashMap+双向链表的方法来存储历史访问记录,链表在逻辑上以访问次数为k分为两个队列,我将这两个队列统一在了一个链表中,中间通过一个叫做特殊的KTailNode的节点分割开(我也不知道当时为什么我要将两个队列统一在一个链表里,现在想来完全没有必要)。
LRUKReplacer成员变量如下:
class LRUKReplacer {
std::atomic<size_t> current_timestamp_{0};
std::atomic<size_t> curr_size_{0};
size_t k_;
std::unordered_map<frame_id_t, DListNode *> map_;
DListNode *head_, *tail_;
DListNode *k_tail_;
mutable std::shared_mutex map_latch_; // 保护map_
mutable std::shared_mutex nodes_latch_; // 保护双向链表
}
另外,每次调用LRUKReplacer的Access方法时,都会使current_timestamp_++;DListNode有个evictable_ 成员变量,用来标识是否可evict。
其实我个人对LRU-K replacement的实际效果存疑,因为LRU-K相比LRU-K的优势在于能够防止偶发性的、周期性的批量操作会导致LRU命中率急剧下降的问题,但我认为LRU-k的开销实际上是很大的,特别是对访问次数大于k的队列中的节点,理论上每次访问的时间复杂度为O(n), 相比LRU O(1)的开销来说很大。另外,”缓存污染“其实可以通过By-pass来绕过buffer pool manager, 不进行缓存即可。
Task #3 Buffer Pool Manager
这一步就是利用基于extendible hashmap的页表和lru-k replacer来着手构建BPM了。BPM重要的成员函数摘录如下:
class BufferPoolManagerInstance : public BufferPoolManager {
ExtendibleHashTable<page_id_t, frame_id_t> *page_table_;
LRUKReplacer *replacer_;
std::list<frame_id_t> free_list_;
std::shared_mutex latch_;
}
页表page_table_是用来存储page_id到frame_id的映射的,free_list_ 则是保存空闲的frame_id。具体来说,需要查找page_id页时,首先通过页表查找是否在缓冲区中。如果在的话,则为缓冲命中,否则需要通过DishManager从磁盘调入到内存中,如果free_list非空,则直接从free_list里得到frame_id, 否则需要通过lru-k replacer来决定evict的page,如果dirty,还需要阻塞(这里可能可以优化为多线程写入),将脏页同步到磁盘。
另外,FetchPage时会将page的pin_count++, Unpin时会将pin_count–,当pin_count=0是说明没有事务在使用这个page,此时就可以通过设replacer中该page为evictable=true。
并发方面,我一开始是设计了free_list_latch、page_table_latch_以及page自带的latch来进行控制的,在锁page之后立马释放page_table_latch,从而提高性能,花了很多时间实现后在gradescoped上拿了满分,可惜在之后的B+tree index的实验中出现了难以解决的死锁问题,没办法改为了“一把大锁保平安”的方式。(注:我发现好像这个死锁是可以解决的,回头再改回来试试)
Project #2 - B+Tree
Task #1 - B+Tree Pages
实现Internal Page(BPlusTreeInternalPage)和Leaf Page(BPlusTreeLeafPage)的一些简单成员方法,没有什么好说的。注:Internal Page和Leaf Page都是继承自BPlusTreePage,而不是Page。Page的Data字段实际上就是存储BPlusTreePage,这里BUSTUB_PAGE_SIZE = 4096 也就是4K:
class Page {
char data_[BUSTUB_PAGE_SIZE];
}
其实利用BPM Fetch出来的是一个Page,需要通过reinterpret_cast<>(page->Data())才能得到BPlusTreeInternalPage或者BPlusTreeLeafPage,另外,这两个Page有自己的Header,格式不一样,LeafPage Header如下,InternalPage Header与之相比,少了NextPageId这一属性。
* Header format (size in byte, 28 bytes in total):
* ---------------------------------------------------------------------
* | PageType (4) | LSN (4) | CurrentSize (4) | MaxSize (4) |
* ---------------------------------------------------------------------
* -----------------------------------------------
* | ParentPageId (4) | PageId (4) | NextPageId (4)
* -----------------------------------------------
对应着BPlusTreePage的成员变量:
class BPlusTreePage {
IndexPageType page_type_ __attribute__((__unused__));
lsn_t lsn_ __attribute__((__unused__));
int size_ __attribute__((__unused__));
int max_size_ __attribute__((__unused__));
page_id_t parent_page_id_ __attribute__((__unused__));
page_id_t page_id_ __attribute__((__unused__));
}
声明顺序不能改变,不然会reinterpret_cast的结果是错的。
Task #2 - B+Tree Data Structure
目标是实现B+树索引。
insert时,叶子结点和内部节点的分裂的条件有所不同,具体来说,如果插入后叶子结点大于等于maxSize,则节点分裂;如果插入前内部节点前已经大于等于maxSize了,则内部节点需要先分裂,再插入。这样的结果是叶子节点大小能控制在小于等于maxSize-1,而内部节点小于maxSize,为什么这样设计呢?其实我没有想清楚。
remove时,如果remove后节点的size小于minSize了,就需要向邻居节点借一个元素,如果邻居节点的元素也不够,则需要进行合并。先考虑是否能借元素的原因是:如果可以通过借元素来满足size限制,那么借完后就只需要对parent_page的对应key进行更新即可;而合并元素的话,会需要删除parent_page的对应key,开销较大(包括移动元素,以及可能导致parent_page的size约束也不满足)等。
关于remove的这一点我是在实现之后反思的过程中才想明白的,我的实现是优先上左边的邻居,如果左边的邻居存在,则向它借或者合并;否则和左邻居借或者合并。这点需要之后改进。
注:这里的左右邻居的前提是同属同一个parent_page的child。
Task #3 - Index Iterator
实现一个用于遍历B+树索引的迭代器。思路就是由迭代器维护一个Page* 和此时读到数组下标索引,如果索引>=MaxSize了,就通过BPM取下一个Page,如此往复,直到next_page_id= INVALID_PAGE_ID。
对于最后一个Page什么时候UnPin的问题,可以放到析构函数里来解决:
INDEXITERATOR_TYPE::~IndexIterator() {
if (page_ == nullptr) {
return;
}
bpm_->UnpinPage(page_->GetPageId(), false);
}
Task #4 - Concurrent Index
这个任务是实现基于Latch Crabbing的B+树并发访问,感觉难度很大。
思路是在insert或者remove时,获取路径上的节点的写锁,但latch crabbing指出,我们其实不需要一直锁着搜索路径上的节点,而是在“确定该节点一定不会修改”某节点时,及时释放上面的锁。
对于insert来说,如果确定子节点不会分裂,则可以释放锁;对于remove来说,如果确定子节点不会借元素或者合并,则可以释放锁。锁是通过Transaction来记录的。
这里有个隐蔽的问题卡了我很久很久,老是有测例会出现data race的情况。原因是我在remove的borrow或者merge访问邻居节点时,故意没有对邻居节点加锁,因为我认为如果一个节点需要borrow或者merge,那我们一定会持有parent page的锁,那么后来的请求一定是无法访问parent page的子树的,包括邻居节点。但后来我发现这是有问题的,因为这个锁只能保证后来的请求不能访问,但是之前的请求可能还在访问这个节点!所以如果不加锁,就会产生data race。
Latch Crabbing由于保证了从上到下获取锁,所以是无死锁的。
特别需要注意的一点是,FetchPage需要在Page->WLock()前进行,UnpinPage需要在在Page->WUnlock()后进行。
Project #3 - Query Execution
该项目是为每个PlanNode实现对应的执行器,执行需要的所有信息都保存在了PlanNode中,然后通过ExecutorContext来获取事务(暂时未涉及)、catalog的背景信息。
这些都是按照火山模型实现。
Task #1 - Access Method Executors
实现SeqScan、IndexScan、Insert、Delete执行器。
SeqScan是通过从获取TableIterator,从头遍历到尾,同时还要通过filter来进行条件判断。IndexScan类似,不过是通过索引顺序遍历
Insert和Delete我维护了一个called_time,保证一次Next之后都返回false。另外,还需要维护相关索引。
Task #2 - Aggregation & Join Executors
AggregationExcutor是一个阻塞的执行器,通过一哈希表来对元组进行聚合,具体来说,根据group_bys_进行分类,而groups_by_ 是一个vector 。对于一个元组来说,计算得到groups_bys_ 的vector结果后,通过combine Hash可以得到最终的hash, 将此hash作为key, 再计算aggragates_(vector)作为value, 保存在哈希表中。核心的数据结构如下:
struct AggregateKey {
std::vector<Value> group_bys_;
// override == for aggregateKey, for hash table
auto operator==(const AggregateKey &other) const -> bool {
for (uint32_t i = 0; i < other.group_bys_.size(); i++) {
if (group_bys_[i].CompareEquals(other.group_bys_[i]) != CmpBool::CmpTrue) {
return false;
}
}
return true;
}
}
struct AggregateValue {
std::vector<Value> aggregates_;
};
class SimpleAggregationHashTable {
std::unordered_map<AggregateKey, AggregateValue> ht_{};
const std::vector<AbstractExpressionRef> &agg_exprs_;
const std::vector<AggregationType> &agg_types_;
}
class AggregationExecutor {
SimpleAggregationHashTable aht_;
SimpleAggregationHashTable::Iterator aht_iterator_;
}
// define hash function for AggregateKey
namespace std {
template <>
struct hash<bustub::AggregateKey> {
auto operator()(const bustub::AggregateKey &agg_key) const -> std::size_t {
size_t curr_hash = 0;
for (const auto &key : agg_key.group_bys_) {
if (!key.IsNull()) {
curr_hash = bustub::HashUtil::CombineHashes(curr_hash, bustub::HashUtil::HashValue(&key));
}
}
return curr_hash;
}
};
}
JoinExcutor方面,要实现NestedLoopJoin和NestedIndexJoin两种, 每种类别都要实现LeftJoin和InnerJoin两种形式。
对于NestedLoopJoin,内表和外表两层嵌套循环,我在实现上只需要保存outer table的游标Tuple left_tuple_如果outer table跑完了,则结束。对于InnerJoin实现较简单;而LeftJoin在InnerJoin的基础上,还需要在内存循环判断是否找到了合适的inner table的匹配项。如果没有找到,则通过GetRightNullTuple()获取一个空的内表元组与外表元组进行连接。
对于NestedIndexJoin,则是通过内表的B+树索引进行连接,实现起来比NesteLoopJoin要容易,不过Bustub这里只支持BPlusTreeIndexForOneIntegerColumn,也就是以第一列Integer为索引。
Task #3 - Sort + Limit Executors and Top-N Optimization
SortExcutor,这是一个阻塞的执行器。可以通过在Init阶段调用std::sort来实现,并重写排序函数:
std::sort(sorted_tuples_.begin(), sorted_tuples_.end(), [this](const Tuple &t1, const Tuple &t2) {
for (auto [sort_type, predicate] : plan_->GetOrderBy()) {
auto v1 = predicate->Evaluate(&t1, GetOutputSchema());
auto v2 = predicate->Evaluate(&t2, GetOutputSchema());
if (v1.CompareLessThan(v2) == CmpBool::CmpTrue) {
return sort_type != OrderByType::DESC;
}
if (v1.CompareGreaterThan(v2) == CmpBool::CmpTrue) {
return sort_type == OrderByType::DESC;
}
});
Limit Executors非常简单,计数输出即可。
Top-N Optimization是为了对Sort+Limit组合算子进行优化,形成一个TopN算子。这里涉及到优化器的内容,先看看优化器的实现原理:
auto Optimizer::Optimize(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef {
if (force_starter_rule_) {
// Use starter rules when `force_starter_rule_` is set to true.
auto p = plan;
p = OptimizeMergeProjection(p);
p = OptimizeMergeFilterNLJ(p);
p = OptimizeNLJAsIndexJoin(p);
p = OptimizeOrderByAsIndexScan(p);
p = OptimizeSortLimitAsTopN(p);
return p;
}
// By default, use user-defined rules.
return OptimizeCustom(plan);
}
也就是对根节点依次运用规则进行优化,包括消除多余的projection节点、将笛卡尔积+条件筛选转化为带条件的NLJ)、将NLJ优化为IndexJoin或者HashJoin、将OrderBy优化为IndexScan等。
我们要实现的是将Sort+Limit转化为TopN。实现起来非常简单,先对表达式树进行后序遍历,然后判断该节点的子节点是不是Limit且子节点为Sort,如果是,则合并为一个TopN节点。
接下来就是TopN Executor的实现,其实这是一个堆的经典应用问题。方法就是维护一个大小为N的小根堆,每次进来一个元素X,就和堆顶的元素进行大小比较,如果比堆顶大,就pop掉堆顶元素,然后将X push到堆中;这样最终就能得到序列中最大的N个元素,时间复杂度为O(n)。
Project #4 - Concurrency Control
这个project是我花时间最长的一个project,因为不知道如何下手。所以我先看了帆船书中的lock manager章节,可以说,bustub的这个锁管理器就是按照帆船书实现的。看完后再参照了大佬的博客,才知道具体该如何做,还是太菜了:( 。
Task #1 Lock Manager
要实现一个帆船书中的lock manager。并基于此,完善Transaction Manager的实现。Transaction Manager已经实现了Abort\Commit等方法。
我们需要实现的就是lock_manager.cpp中的LockTable、LockRow、UnlockTable、UnLockRow。
对于加锁的过程,可以总结如下:
- 是否可以加锁
- 2PL的限制:如SHRINKING阶段,RR隔离级别下不允许新加任何锁,RC隔离级别下不允许加新的X锁;)
- 行锁:如果要加行锁,需要先判断table上时候有更高级的锁,比如要对行加S锁,就需要先获得所在table的S\X\SIX\IS\IX锁(这个地方我实现的时候没写全,导致gradescope死活过不了,我花了一天时间没发现这个低级错误,最后通过打印日志发现了问题)
- 获取加锁对象的LockRequestQueue
- 尝试加锁,这里根据锁队列中是否已经存在该transaction的锁请求可以分为“锁升级”和普通加锁的情况:
- 如果锁队列中发现已经该事务获得过锁,则是锁升级,首先需要判断是不是合法的锁升级,比如S锁只能升级为SIX/X。(PS:对于锁升级,我曾经疑惑,为什么X锁不能“倒着升级”为S锁呢?答:首先,“升级”成更弱的锁本身就是有问题的,应为升级意味着需要放弃原来的锁,2PL中,如果放弃X锁,那意味着可以出现脏读的问题。另外,Discord里也有人讨论了downgrade的问题,TA的回答是不需要实现,难道真的有lock downgrade吗?)。upgrade的过程是:将原来的锁请求从队列中删除,然后放在当前第一个not granted的请求前面!这点很重要,这么做的原因,后面我会详细展开。
- 如果该事务没有获取过锁,那就在队列末尾新加一个锁请求。
- 判断是否能获取锁,如果不能获取锁,则睡眠在condition varible上,等待前面的请求unlock时唤醒,如此循环,直到获取锁或者被(deadlock detector强制) Abort为止。这个部分代码很简单,但是我认为是lock manager最核心的一部分,代码放在下面做参考,按照这种实现,就能保证锁的顺序基本是FIFO,但是也允许并发地获取,比如多个S请求,可以同时获取锁,而不受严格的FIFO顺序限制。
- 如果获取锁成功,则需要维护事务的锁集合。
// 循环判断条件
while (!this->GrantRequest(new_req->txn_id_, new_req->lock_mode_)) {
this->cv_.wait(target_lock);
...
}
// 判断是否能获取锁
auto LockManager::LockRequestQueue::GrantRequest(txn_id_t id, LockMode lock_mode) -> bool {
for (auto const &req : request_queue_) {
if (req->txn_id_ == id) {
BUSTUB_ENSURE(!req->granted_, "GrantRequest: already granted!");
req->granted_ = true;
return true;
}
if (!IsCompatible(lock_mode, req->lock_mode_)) {
return false;
}
}
UNREACHABLE("GrantRequest Error!");
}
以上就是加锁的流程,具体实现上各种都是大同小异,我一共实现2版,区别在于如何判断一个锁请求是否是一个upgrade请求。我最开始是通过直接查询lock_set来判断是否该事务已经持有锁的,这样,对于不需要upgrade的锁请求,就不需要遍历lock_queue了,直接append在最后即可。但是后来我想,遍历lock_queue链表的开销相比去查询lock_set来说,应该很小,所以这应该是over-optimize了。所以我后来又改了一版,直接遍历lock_queue, 如果没找到对应transaction id的,就说明不是upgrade,append到后面就好了。
Q:为什么同时只能有一个upgrade请求?
A:首先,其实可以通过新增一条upgrade队列来允许多个请求同时升级(discord中有讨论)。如果按照bustub现在的框架,只有一条队列,比如在RR隔离级别下,如果发生S到X的锁升级,那么需要先放弃S锁,并作为第一个未granted的请求放在队列前部,保证第一个获取X锁;那如果这时候又有个新的upgrade请求(也是从S升级到X)出现,那么这个新的请求就会被放在更前面,也就是说有更高的优先级,它会第一个获取X锁,并修改内容,这样就破坏了前一个事务的Repeatable Read的保证。
对于解锁的流程,很简单,如下:
- 如果是表锁,先判断是否解锁。如果事务还持有表下面的行锁,那这就是不允许的。
- 遍历request_queue,看有没有持有锁。
- 如果没有持有锁,则throw一个异常;
- 如果有锁,则将该请求删除,根据isolation level来更新State,解锁成功!
Q:事务的锁集合有必要存在吗?
A:也可以不要,但是会造成性能上的损失。不维护锁集合的话,在判断一个事务是否持有对于一个对象的锁时,需要遍历request队列,这会造成锁的大量争用,对于存在热点数据时,性能损失会很严重。
Task #2 DeadLockManager
最开始我以为这个部分是最难的。最后发现这个部分用的时间最少,原因是目标很明确,任务也很简单——根据整个lock_manager维护的所有lock_queue来建一个有向图,然后用DFS来判断是否有环。如果有环的话就将transaction_id最大的事务(也就是youngest的事务) abort掉,破坏环。
值得注意的是,后台deadlock detection线程每隔一个interval苏醒后,会建一个完整的图。这个图中可能有多个环。bustub是要求按照transaction_id从小开始来进行dfs搜索,每搜到一个环,就abort掉该环中yongest的事务,并更新图,如此往复,直到破掉所有的环,才会继续休眠。
Task #3 - Concurrent Query Execution
该任务是修改上层的executor,在合适的地方Lock和Unlock,来保证各种隔离级别。具体来说,要修改insert\delete\seqscan三个算子。一开始,我觉得insert和delete直接给表加X锁,seqscan直接给表加S锁就好了。结果通过不了测例,后来按照discord里说的,insert和delete给表加IX锁,insert对insert后的数据加行锁,delete也是。seqscan的话对表加IS锁,然后每读一个row前都先尝试给row加读锁,读到后就能unlock了。这种细粒度的加锁方式是为了提高并发度。
注意,框架的insert有点问题。这个在discord上也有人提出了。就是TableHeap::InsetTuple这个函数内部没有加锁,导致只能先物理上InsertTuple才加X锁,这是框架的缺陷,TA说这是一个long term bug(用2PL不能解决?),可能在后面通过MVCC来避免。我个人感觉只要在InsertTuple内部加上对应的逻辑就好了,没那么复杂?
另外,在这部分的框架设计上,个人认为bustub也不够优雅。因为在executor部分加锁时,还需要加入判断是否持有更高级的锁的逻辑。我认为这些逻辑完全可以下移到LockTable和LockRow中,LockTable和LockRow的行为就变成:如果我已经获取了更高级别的锁,可以直接返回true。将transaction持有的锁集合对上层透明化。
about rollback
并发控制中其实有很重要的两个函数Commit和Abort,框架都帮我们实现好了,降低了实现难度,但我觉得还是有必要关注一下。特别是Abort,涉及到事务操作的roll back。bustub中是如何实现roll back的呢?答案是每个事务维护一个write_set,对于Insert\Delete\Update操作都记录下必要信息(WriteRecord),便于Undo。
记录信息的过程框架已经实现了,比如在insert_tuple里有:
// Update the transaction's write set.
txn->GetWriteSet()->emplace_back(*rid, WType::INSERT, Tuple{}, this);
write_set是一共deque,其实这里就是用作一个stack。
Abort中回滚的逻辑如下:
// Undo the writes
while (!table_write_set->empty()) {
auto &item = table_write_set->back();
auto *table = item.table_;
if (item.wtype_ == WType::DELETE) {
table->RollbackDelete(item.rid_, txn);
} else if (item.wtype_ == WType::INSERT) {
// Note that this also releases the lock when holding the page latch.
table->ApplyDelete(item.rid_, txn);
} else if (item.wtype_ == WType::UPDATE) {
table->UpdateTuple(item.tuple_, item.rid_, txn);
}
table_write_set->pop_back();
}
table_write_set->clear();
当然,index的更新也得回滚,此处省略了。
值得注意的是,bustub并没有支持故障恢复。如果要做故障恢复,那这些WriteRecord不止要在内存里,还得写到磁盘中的log日志中;commit和abort也要写日志。
茶余饭后
今天是Oceanbase数据库比赛的东北大学宣讲专场,有两位本校学长回来宣讲,两位都是先入职的华为,然后跳槽到了OB。从他们口中得知,Oceanbase的工作条件很好,公司主张的是“信任”的文化,没有打卡制度,员工们一般九十点到,晚上七八点就下班,如果八点走,还能报销打车费,另外,周末不加班!看着真不错。不过应该门槛比较高,听学长说本科去工作或者实习的都比较少,原因就是本科生对于某个领域,知识积累得不够吧。所以学长给我的建议是,看准一个方向一个岗位,然后去深入了解,对口会让工作好找些。
至于技术上(应该说是行业了解方面)学到的东西,我了解到了OB和polardb在定位上的不同,polardb是云原生的,而OB是面向金融行业的(起码最开始是的),所以和云的关系不大,银行都是用自己的服务器。OB最开始是kv数据库,后面发展为了支持SQL的数据库,核心的技术点是通过paxos实现容灾。