咸鱼回响

望之天回,即之云昏

0%

LevelDB学习:增加/删除/查询

本篇文章主要查看在LevelDB中增删查操作是如何进行的,由于LevelDB在执行这些操作的时候会运行很多后台任务或者其他一些细节操作,但这里只进行到主流程为止,其他具体的细节操作如:调度压缩、日志记录、memtable生效等操作放在后面学习。

查询

查询操作在LevelDB中的接口为:

1
virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0;

三个参数分别为:

  • options:读取参数配置,分别配置读取数据时是否执行校验、读取数据后是否放置在缓存中、是否从给定快照中读取
  • key:要查询的key
  • value:查询的结果

返回结果表示本次查询的状态。注意key的Slice类型,这是在LevelDB中自定义的对字符串的封装方式,可以直接将其看成string.

第一段代码 - 初始化查询版本

1
2
3
4
5
6
7
8
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

这里新建了返回结果Status、锁对象以及SequenceNumber。然后判断options中是否提供了快照,如果有,则本次查询将根据快照的版本进行查询,否则使用默认的最新版本。
从这里我们可以看到,在查询的过程中可以指定sequence number,也就是说LevelDB的查询是可以查询指定snapshot的数据的,这对数据的版本管理提出了严格的要求。而MutexLock的创建则是对该操作进行加锁,以保证获取的versions_的sequence number是顺序正确的。

MutexLock在创建的时候自动加锁

1
2
3
explicit MutexLock(port::Mutex* mu) EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) {
this->mu_->Lock();
}

第二段代码 - 增加内存表引用计数

1
2
3
4
5
6
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

前面的mem_是内存表,即memtable。后面的imm_表示的是正在压缩中的内存表,这个内存表就是即将要持久化到磁盘中的sstable。
获取当前version_的版本,给两张内存表的引用计数加一。
这里的引用计数并不是类与类之间的引用,而是某个线程在执行过程中的引用,即引用计数代表着每个内存表被多少个线程同时使用中。这个可以方便内存表进行持久化的时候判断是有线程在修改。

第三段代码 - 创建返回状态

1
2
bool have_stat_update = false;
Version::GetStats stats;

这两个主要在对内存表查询失败后针对文件进行查询的状态,如果针对文件查询失败,则填充stats

第四段代码 - 执行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

这段代码最开始备注的是“从文件和内存表中读取时释放锁”,也就是说,加锁的部分主要在对一次操作进行时获取版本信息这部分过程,这是为了让每个操作都能够得到正确的version以及SequenceNumber等信息。
在查询信息时,首先从内存表以及不可变内存表中查询,查询时会将SequenceNumber与key合并成专门用于查询的LookupKey对象用于查询。如果都找不到则从文件中查询,如果还是查询失败则填充stats对象并修改have_stat_update状态。
最后再加锁。

为什么在查询的时候不加锁,不怕脏读吗?

第五段代码 - 释放引用计数、执行压缩

1
2
3
4
5
6
7
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;

在确认从文件中获取到数据后执行调度压缩(?这是为了什么呢),然后释放内存表与version_的引用计数并返回结果。
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}

首先通过断言检查当前线程是否持有互斥锁。
然后在if语句中先判断后台压缩是否正在进行(background_compaction_scheduled),如果存在则表示已经在压缩中,不需要做任何操作。
如果当前内存正在释放,则表示正在删除数据库。不会有更多的背景压缩。
如果数据库开始时就出错,则不会有更多变化。
如果imm_为空,则表示当前没有正在压缩中的内存表,且对于version_来说并不需要进行压缩,因此不做任何事。
最后,当所有条件都没有满足的情况下,执行压缩。此时调用的Schedule是根据当前文件系统实现的,该方法的定义是在后台运行一次输入函数,而在这里输入的函数为DBImpk::BGWork.背景压缩的主要功能就是将内存表压缩为磁盘组件。这个在后面介绍“memtable如何转变为sstable”再介绍。这里先记住执行这个功能的函数为:DBImpl::BackgroundCompaction

当查询没有命中内存表后会尝试执行背景压缩,这表明当前执行查询的业务数据已经转化为磁盘数据,内存表作为缓存的作用被削弱,为了减少内存开支需要将其持久化到磁盘。

添加

添加有两个方法:Put与Write,两者的区别在于Write属于批量添加,Put属于单个添加。但为了保证添加数据的一致性,本质上Put就是一条数据的批量添加,这在Put的实现中也表现如此:

1
2
3
4
5
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

第一段代码 - 初始化写入者(Writer)

1
2
3
4
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

创建一个写入者,并将本批次需要写入的数据赋值给它。同时将mutex_指针传递给写入者,但是这里并没有加锁,进去查看源码可以发现Writer将mutex_指针传递给它的一个CondVar对象。这个对象也只是保留该指针而已。

1
explicit CondVar(Mutex* mu) : mu_(mu) { assert(mu != nullptr); }

这个对象LevelDB给出的解释为:简单地包装stf::condition_variable,这是一个条件变量.

第二段代码 - 暂停之前的写入任务

1
2
3
4
5
6
7
8
9
10
11
12
//创建锁
MutexLock l(&mutex_);
//将写入任务压入写入队列
writers_.push_back(&w);
//等待之前没完成的任务完成
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
//如果w已经写入完成,则直接返回
return w.status;
}

这里开始就先加锁,将写入者加入写入者队列中,如果当前写入者没有完成并且该队列中的第一个元素并不是当前写入者,则使得当前写入者的条件变量处于wait状态,从代码中的注释中看,该方法所执行的功能为:原子地释放*mu 并阻塞此条件变量,直到调用SingnalAll或者调用Singnal以唤醒该线程,前提是该线程池有*mu。这里的*mu就是第一段代码中传入的&mutex_指针。这里的作用就是停止当前写入者的条件变量并释放相关资源。如果在这之前写入者已经写入完成,则返回写入的状态。

第三段代码 - 写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// May temporarily unlock and wait.
//暂时解锁并等待
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
//添加日志并应用与memtable。
//我们可以在这个阶段释放锁,因为 &w 目前负责记录日志并防止并发记录器和并发写入 mem_。
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
//写入日志
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
//写入日志成功后才能在memtable中生效
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
//写入日志失败,将这次失败记录
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}

这一段比较长,不过条理还是很清晰的,加上原文的注解也是很容易理解。
首先是解除锁并等待,这里调用的函数为MakeRoomForWrite,调用这个函数的前提是当前线程持有mutex_并且处于写入者队列的首位,这里主要的作用就是为后续的写入腾出空间。后续再进行研究,现在先关注写入的主要流程。
腾出空间后获取当前最新的版本号,然后将现在的要写入者设置为最后一个写入者。当腾出空间完成并且传入有数据的时候,根据数据创建批处理对象组,即WriteBatch,将最新编号作为该批次的编号并返回批处理的条目数量。
添加日志并应用于memtable,在这一步可以释放锁,那么在写入阶段需要处理并发的只有获取处理编号这里为了保证编号的唯一与顺序的正确,这一步是为了后续进行恢复的时候数据写入顺序的一致性。然后在写入日志与在memtable中生效这步居然释放了锁,这是为什么?
写入完成后保存结果并更新版本编号,这一步又重新加锁。

第四段代码 - 唤醒未完成的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;

这一步主要将本次写入之前的所有写入者唤醒。如果真的存在本次写入者之前的写入者没有完成的话,则唤起写入队列。

删除:

查看源码可以发现,删除本质还是一条WriteBatch指令,这也恰好符合LSM-Tree的特征,其本质是一个追加写入的日志文件系统,在后面可以看到,删除操作主要发生在rolling-merge阶段。

1
2
3
4
5
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}