ceph-mon paxos实现

简介

Monitor是Ceph集群中的元数据管理组件,负责整个集群的元信息的维护,这里的元信息主要是几张抽象的map,包括osdmap、pgmap、monmap、mdsmap等。在Ceph集群中,OSD的设计原型为INTELLIGENT STORAGE DEVICES,通过这种智能的OSD可以减少Monitor作为中心节点的负担。在实际运行过程中,真正需要Monitor介入的主要是以下几种场景:

  • Client在首次访问时需要从Monitor处获取集群crush map信息
  • OSD节点会向Monitor节点汇报故障OSD的信息,Monitor需要修改对应map
  • OSD在加入集群时,会向Monitor发送信息,Monitor需要修改对应map

整体来说,Monitor是基于改进的Paxos算法,对外提供一致的元信息访问及更新服务。

PAXOS

Monitor的整体架构如上图所示,整体分为4层,分别是DB层、Paxos层、PaxosService层及应用层。DB层用于提供单机KV存储服务,而PaxosService将应用层的不同元信息操作层封装成KV操作下发至Paxos层,Paxos层对上层提供一致性的KV存储服务,上层的不同PaxosService都共用一个Paxos实例。下面来看Paxos层是如何基于改进的Paxos算法来对外提供服务的。

Monitor中主要有两个角色,分别是leader及peon,其中只有leader能发提案,即leader对应于paxos中的proposer,peon及leader都是作为paxos中的acceptor存在,只要Monitor集群中半数以上
的节点存活,Monitor就能正常对外提供服务。下面先不看leader选举及异常恢复机制,
下面通过源码来看一个正常运行Monitor集群中paxos层是如何工作的。

一个paxos实例存在如下可能的状态:

  • recovering:恢复状态,用于选主后各个实例之间的数据同步
  • active:空闲状态,可以进入新的paxos流程
  • updating:处于提案commit阶段
  • updating-previous:恢复数据期间,实例处于提案commit阶段
  • writing:提案正在写入
  • writing-previous:恢复数据期间,实例正在写入提案
  • refresh:等待刷新状态,获取lease后就可进入active状态
  • shutdown:异常状态

一个Paxos实例的正常运行过程主要涉及到如下数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private:
//当前实例上保存的最低版本已commit记录
version_t first_committed;

//上次提案的proposal number,一个leader在任期间pn是不会变的
version_t last_pn;

//当前实例上保存的最高版本已commit记录
version_t last_committed;

//已accept的最新proposal number
version_t accepted_pn;

//paxos会将提案的值以Transaction的方式记录在pendping_proposal中,一个Transaction
记录了一系列此次提案对应的操作。从这种形式也能看出来,paxos层一次只能对一个提案进行决议。
/**
* Pending proposal transaction
*
* This is the transaction that is under construction and pending
* proposal. We will add operations to it until we decide it is
* time to start a paxos round.
*/
MonitorDBStore::TransactionRef pending_proposal;

//pending_finishers中暂时存放提案被accept后的回调函数
/**
* Finishers for pending transaction
*
* These are waiting for updates in the pending proposal/transaction
* to be committed.
*/
list<Context*> pending_finishers;

//paxos流程开始后,committing_finishers中存放的是提案被选定后的回调函数
/**
* Finishers for committing transaction
*
* When the pending_proposal is submitted, pending_finishers move to
* this list. When it commits, these finishers are notified.
*/
list<Context*> committing_finishers;

Paxos的提案入口是在trigger_propose,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  bool Paxos::trigger_propose()
{
if (plugged) {
dout(10) << __func__ << " plugged, not proposing now" << dendl;
return false;
} else if (is_active()) {
dout(10) << __func__ << " active, proposing now" << dendl;
propose_pending();
return true;
} else {
dout(10) << __func__ << " not active, will propose later" << dendl;
return false;
}
}

void Paxos::propose_pending()
{
assert(is_active());
assert(pending_proposal);

cancel_events();

bufferlist bl;
pending_proposal->encode(bl);

dout(10) << __func__ << " " << (last_committed + 1)
<< " " << bl.length() << " bytes" << dendl;
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
pending_proposal->dump(&f);
f.flush(*_dout);
*_dout << dendl;

pending_proposal.reset();

committing_finishers.swap(pending_finishers);
state = STATE_UPDATING;
begin(bl);
}

从以上源码也可以看出来,一个提案首先是被持久存储在DB中的,这样可以保证提案被被提出后,即使paxos实例异常退出,提案数据也不会丢失。当一个paxos流程开始后,对应的提案会被从DB中取出,并开始决议过程。

begin(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// leader
void Paxos::begin(bufferlist& v)
{
dout(10) << "begin for " << last_committed+1 << " "
<< v.length() << " bytes"
<< dendl;

//只有leader才能发起提案,并且一次只能由一个提案被决议
assert(mon->is_leader());
assert(is_updating() || is_updating_previous());

// we must already have a majority for this to work.
assert(mon->get_quorum().size() == 1 ||
num_last > (unsigned)mon->monmap->size()/2);

// and no value, yet.
assert(new_value.length() == 0);

// accept it ourselves
accepted.clear();
//accepted列表中插入自身的rank值,因为自己的提案自己是不会拒绝的
accepted.insert(mon->rank);
new_value = v;

//本次提案为第一个提案
if (last_committed == 0) {
auto t(std::make_shared<MonitorDBStore::Transaction>());
// initial base case; set first_committed too
t->put(get_name(), "first_committed", 1);
decode_append_transaction(t, new_value);

bufferlist tx_bl;
t->encode(tx_bl);

new_value = tx_bl;
}

// store the proposed value in the store. IF it is accepted, we will then
// have to decode it into a transaction and apply it.
//将本次提案存入数据库中,其中key为本次提案版本号,value为提案的值,在提案被选定后,
//Paxos会将提案读出并使其生效
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(get_name(), last_committed+1, new_value);

// note which pn this pending value is for.
//更新当前决议过程中的提案的信息
t->put(get_name(), "pending_v", last_committed + 1);
t->put(get_name(), "pending_pn", accepted_pn);

dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
bufferlist::iterator new_value_it = new_value.begin();
debug_tx->decode(new_value_it);
debug_tx->dump(&f);
*_dout << "\nbl dump:\n";
f.flush(*_dout);
*_dout << dendl;

logger->inc(l_paxos_begin);
logger->inc(l_paxos_begin_keys, t->get_keys());
logger->inc(l_paxos_begin_bytes, t->get_bytes());
utime_t start = ceph_clock_now();

//将提案持久化存储在DB中
get_store()->apply_transaction(t);

utime_t end = ceph_clock_now();
logger->tinc(l_paxos_begin_latency, end - start);

assert(g_conf->paxos_kill_at != 3);

if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
//如果当前quorum中只有一个monitor实例,则提案直接被选定,开始将提案进行commit
commit_start();
return;
}

// ask others to accept it too!
//将天发送至quorum中的所有其他成员(peon),
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue; //该成员为实例本身

dout(10) << " sending begin to mon." << *p << dendl;
MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
ceph_clock_now());
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
//将last_committed及accepted_pn一并发送给peon,供peon判断是否接受本次提案
//一个leader在任期间内pn都是不会变的
mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
}

// set timeout event
//设置提案超时的回调
accept_timeout_event = mon->timer.add_event_after(
g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
accept_timeout();
}));
}

handle_begin(peon)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// peon,处理leader发送过来的begin消息
void Paxos::handle_begin(MonOpRequestRef op)
{
op->mark_paxos_event("handle_begin");
MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_begin " << *begin << dendl;

// can we accept this?
//如果已经acceptle版本更高的提案,则拒绝此提案,因为一个leader在任期间,pn是不会变的
//发送此情况多半是leader已经发生了切换
if (begin->pn < accepted_pn) {
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
op->mark_paxos_event("have higher pn, ignore");
return;
}
assert(begin->pn == accepted_pn);
assert(begin->last_committed == last_committed);

assert(g_conf->paxos_kill_at != 4);

logger->inc(l_paxos_begin);

// set state.
//设置该paxos实例的状态,进入提案决议流程
state = STATE_UPDATING;
lease_expire = utime_t(); // cancel lease

// 接受此次提案
version_t v = last_committed+1;
dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
// store the accepted value onto our store. We will have to decode it and
// apply its transaction once we receive permission to commit.
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(get_name(), v, begin->values[v]);

// note which pn this pending value is for.
//更新此次提案的版本号、proposal number等
t->put(get_name(), "pending_v", v);
t->put(get_name(), "pending_pn", accepted_pn);

dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;

logger->inc(l_paxos_begin_bytes, t->get_bytes());
utime_t start = ceph_clock_now();

//在peon上存储此次提案的值
get_store()->apply_transaction(t);

utime_t end = ceph_clock_now();
logger->tinc(l_paxos_begin_latency, end - start);

assert(g_conf->paxos_kill_at != 5);

// reply
//将accept消息回复给leader
MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
ceph_clock_now());
accept->pn = accepted_pn;
accept->last_committed = last_committed;
begin->get_connection()->send_message(accept);
}

handle_accept(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// leader,处理peon发送回来的accept消息
void Paxos::handle_accept(MonOpRequestRef op)
{
op->mark_paxos_event("handle_accept");
MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_accept " << *accept << dendl;
int from = accept->get_source().num();

//此消息为前任leader期间的消息,忽略
if (accept->pn != accepted_pn) {
// we accepted a higher pn, from some other leader
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
op->mark_paxos_event("have higher pn, ignore");
return;
}
//此消息为过时消息,忽略
if (last_committed > 0 &&
accept->last_committed < last_committed-1) {
dout(10) << " this is from an old round, ignoring" << dendl;
op->mark_paxos_event("old round, ignore");
return;
}

// to do ??不明白
assert(accept->last_committed == last_committed || // not committed
accept->last_committed == last_committed-1); // committed

//一次只能由一个提案处以决议中
assert(is_updating() || is_updating_previous());
assert(accepted.count(from) == 0);
//from这个peon已经同意此提案
accepted.insert(from);
dout(10) << " now " << accepted << " have accepted" << dendl;

assert(g_conf->paxos_kill_at != 6);

// only commit (and expose committed state) when we get *all* quorum
// members to accept. otherwise, they may still be sharing the now
// stale state.
// FIXME: we can improve this with an additional lease revocation message
// that doesn't block for the persist.
//与标准paxos不同的是,Monitor总要quorum的所有成员全部accept一个提案,这个提案才算被选定,
//这样能简化paoxs的实现,但是在mon节点发生故障时,paxos服务会短暂不可用,直至形成新的quorum
//由于一次只有一个提案处于决议中,形成新的quorum过程也较快
//如果提案被选定了,就开始进入commit阶段
if (accepted == mon->get_quorum()) {
// yay, commit!
dout(10) << " got majority, committing, done with update" << dendl;
op->mark_paxos_event("commit_start");
commit_start();
}
}

commit_start(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//leader,开始提案commit流程
void Paxos::commit_start()
{
dout(10) << __func__ << " " << (last_committed+1) << dendl;

assert(g_conf->paxos_kill_at != 7);

auto t(std::make_shared<MonitorDBStore::Transaction>());

// commit locally
//更新本地last_commited的值为最新提案的版本号
t->put(get_name(), "last_committed", last_committed + 1);

// decode the value and apply its transaction to the store.
// this value can now be read from last_committed.
decode_append_transaction(t, new_value);

dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;

logger->inc(l_paxos_commit);
logger->inc(l_paxos_commit_keys, t->get_keys());
logger->inc(l_paxos_commit_bytes, t->get_bytes());
commit_start_stamp = ceph_clock_now();

//将提案对应的事务在本地DB中生效
//本地提交后,会调用C_Committed中的回调函数commit_finish
get_store()->queue_transaction(t, new C_Committed(this));

//更新leader状态为STATE_WRITING_PREVIOUS或者STATE_WRITING
if (is_updating_previous())
state = STATE_WRITING_PREVIOUS或者;
else if (is_updating())
state = STATE_WRITING;
else
ceph_abort();
++commits_started;

if (mon->get_quorum().size() > 1) {
// cancel timeout event
mon->timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
}
}

commit_finish(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//leader,将commit操作下发至peon,刷写leader的状态
void Paxos::commit_finish()
{
dout(20) << __func__ << " " << (last_committed+1) << dendl;
utime_t end = ceph_clock_now();
logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);

assert(g_conf->paxos_kill_at != 8);

// cancel lease - it was for the old value.
// (this would only happen if message layer lost the 'begin', but
// leader still got a majority and committed with out us.)
lease_expire = utime_t(); // cancel lease

last_committed++;
last_commit_time = ceph_clock_now();

// refresh first_committed; this txn may have trimmed.
first_committed = get_store()->get(get_name(), "first_committed");

_sanity_check_store();

// tell everyone
//给quorum中的所有peon发送commit信息
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue;

dout(10) << " sending commit to mon." << *p << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
ceph_clock_now());
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
commit->last_committed = last_committed;

mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}

assert(g_conf->paxos_kill_at != 9);

// get ready for a new round.
new_value.clear();

// WRITING -> REFRESH
// among other things, this lets do_refresh() -> mon->bootstrap() know
// it doesn't need to flush the store queue
//更新状态为STATE_REFRESH
assert(is_writing() || is_writing_previous());
state = STATE_REFRESH;
assert(commits_started > 0);
--commits_started;

//其中,do_refresh是让上层服务刷新状态,获取最新的commit信息等
if (do_refresh()) {
commit_proposal();
if (mon->get_quorum().size() > 1) {
extend_lease();
}

finish_contexts(g_ceph_context, waiting_for_commit);

assert(g_conf->paxos_kill_at != 10);

finish_round();
}
}

handle_commit(peon)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//peon,处理leader发送过来的commit消息
void Paxos::handle_commit(MonOpRequestRef op)
{
op->mark_paxos_event("handle_commit");
MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_commit on " << commit->last_committed << dendl;

logger->inc(l_paxos_commit);

if (!mon->is_peon()) {
dout(10) << "not a peon, dropping" << dendl;
ceph_abort();
return;
}

//to do?? 不知道啊
op->mark_paxos_event("store_state");
store_state(commit);

if (do_refresh()) {
//唤醒等待中的回调函数
finish_contexts(g_ceph_context, waiting_for_commit);
}
}

至此,一轮提案决议过程就算完成了,monitor会发送ack信息给客户端,各个paxos实例便算完成了数据恢复和同步,leader的状态转换图如下所示:

对应的转换操作如下:

  • active-updating:接收到提案请求,将提案持久化存储,进入提案状态
  • updating-writing:quorum的全部成员都同意此提案,开始提交提案
  • writing-refresh:leader commit成功,并通知其他节点提交,开始刷新上层服务,通知上层可读
  • refresh-active:刷新完成,一轮提案完成

peon的状态转换图如下所示:

peon在接收到提案后就进入updating状态,此状态期间是不提供读服务的,提案完成后接收到leader的lease后会重新进入active状态

异常恢复

下面分别以leader异常退出和peon异常退出为例来看数据是如何恢复的。
在看具体恢复例子时,要先清除整个paxos实例的一个状态转换图,清楚在每个状态异常退出时,数据是如何来恢复的,如下:

leader节点在选举成功之后会进入recovering状态用以尝试恢复数据,如果发现有未提交的数据,则会进入updating_previous状态,开始恢复数据,下面通过两种情况来分析monitor中paxos的恢复逻辑。

Peon down

peon节点down掉之后会触发选主流程,由于monitor是根据ip来选主的,原先的leader节点必定还会当选,且它的数据一定是最新的,leader节点只需进入数据恢复流程,尝试提交尚未commit的数据即可。

down的节点在之后加入集群后通过probing阶段便可同步数据,之后正常加入集群,leader节点不会变化

Leader down

leader节点可能down在任意状态,下面分别从leader down、leader up后集群如何恢复正常来看整个数据恢复流程。

down

leader down了之后,有新的peon节点当选为leader,leader节点可能down在以下几种状态:

  • down在active状态,无需恢复数据
  • down在updating状态,如果没有peon accept提案,则无需恢复数据。如果有的话,peon只需学习该提案即可
  • down在writing状态,说明所有peon已经accept提案,新的leader会从新propose该提案
  • down在refresh状态,说明老的leader已经成功提交提案,如果peon已经收到commit消息,则该提案会被学习到,若未收到的话,会重新propose该提案

up

leader节点在重新up后,会通过probing阶段做数据同步,当选为leader之后会进入数据恢复流程。

节点异常恢复分析需要重点关注下是否会存在数据丢失或者不一致的情况,peon节点down掉肯定不会造成数据丢失和不一致。唯一需要注意的是leader点down掉,如果数据已经commit了的话,peon处肯定是会持久存储的,所以不会有数据丢失,如果还有uncommited的数据的话,如果有peon已经接收的话,是会被重新学习的,所以不会造成数据丢失和不一致。

一致性达成

要保证强一致性的读取,有以下两个点需要注意:

  • 如果防止读到过期的数据
  • 如何防止读到尚未commit的数据

ceph-monitor中是通过租约机制来保证读的,持有lease的节点才能被读取数据,在提案过程之中所有节点的租约是会回收的,即提案过程中,paxos层是不可读的,这对于monitor这种典型的读多写少的场景也是一种合理的取舍。
同时通过以上的分析可知,每个提案是有一个版本号的,上层应用层在读取数据的时候需要带上版本号来读取数据,对于尚未commit的提案,是不可能会被读取到的,反应在应用层就是读请求会阻塞住,直至该提案可读。

定制化实现

与标准的multi-paxos或者其他paxos工程化实现,ceph-monitor中的paxos做了如下实现:

  • quorum:与其他共识算法实现很不同的是,ceph-paxos中quorum成员在选主之后就是固定的,之后所有的决议都要全部quorum成员的同意,任何一个quorum成员的变化都会触发重新选主。这样能简化paxos的实现,但是如果quorum成员变化频繁的话,ceph-paxos的可用性和性能就会受到很大影响。考虑到Monitor的节点数较少,这种取舍也是合理的

  • 一次决议一个值:ceph-paxos一次只能决议一个值,同时对于决议值要求所有的quorum成员都应答,即不允许参与决议的节点存在日志空洞,前一条日志commit之后才能发起下个提案,这样能非常有效简化数据恢复流程。对于决议过程中的新提案请求,ceph-paxos层及上层的应用层都会进行聚合,这样能有效降低monitor的写入压力

  • 读取:leader节点会给peon节点发放lease,持有lease的节点可以接收读请求,如果有提案在决议过程中,则取消peon的lease,防止读到旧的数据,提案commit之后peon节点会重新获得lease。通过每个提案会有个版本号,应用层读取数据时会带上此版本号用以读取最新的数据,通过这样的机制可以保证强一致性读,lease的引入对于monitor这种典型的读多写少的应用也是非常有效的

Reference