LevelDB之读写流程

PUT

LevelDB的写入性能较高,整体步骤就是先写wal日志再写入memtable中,由于wal日志是append写入的,性能较高,所以写入一般不存在瓶颈。但是如果写入速度过快,导致memtable来不及flush或者lelvel 0中的文件数较多时,系统的写入可能会被限制,这就是LevelDB的write stall问题,整体写入流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
struct DBImpl::Writer {
Status status; // 写入结果
WriteBatch* batch;
bool sync; //本次写入对应的log是否立刻刷盘
bool done; //本次写入是否完成
port::CondVar cv; //条件锁,会被之前的写入线程唤醒

explicit Writer(port::Mutex* mu) : cv(mu) { }
};

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);

//先加锁,保证只有一个线程能写入,在开始写log和memtable的时候会释放
MutexLock l(&mutex_);
//加入等待队列中排队
writers_.push_back(&w);
//如果此次写入没有被执行且不处于队列头部,则等待之前的写入完成
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
//如果此次写入已经执行了,则直接返回
//其他写入请求在写入时可能会将队列中的写入请求一起取出合并写入DB中
if (w.done) {
return w.status;
}

//写入前的检查,memtable、wal日志、compaction等,可能会阻塞住此次写入
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) {
//合并队列里的多个写入
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);

// 写入wal日志和memtable,写入前可以先释放锁,让后续write请求进入队列中排队,
//这样也能保证写入时互斥的,因为此时已经确定是只有w来负责写入 ,这样可以减小互斥时间
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
//更新versions时需要加锁,确保last_sequence正确修改
mutex_.Lock();
if (sync_error) {
//wal日志sync失败
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

//修改合并写入请求的状态
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// 唤醒队列头部的请求
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

GET

LevelDB支持获取,某个key的指定版本对应的值,如果没有指定版本的话,则获取最新的kv对,整体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
//获取快照和version时先加锁,保证相关引用的正确修改
MutexLock l(&mutex_);
SequenceNumber snapshot;
//获取相关sequenceNUmber
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
//memtable引用加1
mem->Ref();
if (imm != NULL) imm->Ref();
//version引用加1
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

//ref已经设置完成,可以释放锁了
{
mutex_.Unlock();
// 在memtable中查找
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// 在imm中查找
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
//通过version查找,一个version代表DB中sstable文件的一个状态
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
//确保只有一个线程修改引用
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats)) {
//如果扫描次数过多可能会触发compaction
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}

其中要注意的是,要通过锁来确保只有一个线程来修改相关的引用计数,修改完引用后可以释放锁,即读取memtable和sstable不是互斥的。