如何使用raft算法开发强一致kv存储系统
Posted 2021-09-05 00:47 +0800 by ZhangJie ‐ 1 min read
本文内容
本文结合etcd源码来进行介绍,etcd/contrib/raftexample提供了一个基于etcd/raft实现的kv存储系统。从该示例出发,我们来看一看如何基于raft算法开发一个强一致的kv存储系统。
看完本文的源码分析后,上手一个raft强一致系统开发就不是什么难事了。
ps:假定读者已经阅读并理解了raft论文,这里有我的批注版的In Search of an Understandable Consensus Algorithm.pdf,读起来可能会好理解点。
etcd/raft
etcd服务端程序入口:see 源码
- 启动过程中区分当前节点类型:根据data-dir目录下的目录名member/proxy/empty来区分,然后启动etcd实例或者proxy;
- 启动etcd服务节点:startEtcd这个函数,逻辑主要包括启动供集群节点间通信的rafthttp服务,以及供客户端请求的服务;
- 启动etcd proxy:startProxy这个函数,逻辑主要是启动etcd代理;
etcd哪些部分值得学习:
- etcd proxy从项目功能上来说虽然很重要,但是从学习角度来说没那么有价值,不看这个;
- etcd server从项目功能上来说是核心,但是我们也没有必要学习所有的请求处理逻辑,重点是关注读写操作时如何基于raft实现强一致;
- raft:这部分是raft算法的核心实现,从理解raft论文到算法工程化需要额外做出巨大的优化,这些知识点往往是通用的,重点掌握;
raft部分:
- pb协议:
- raft peers的通信协议,see 源码;
- raft算法中提到核心的几个rpc就是Vote、AppendEntries,但是工程中需要考虑更多,详见上述pb中的enum MessageType;
- 上述pb中的message Message类型定义了rpc通信过程中的请求/响应,不同rpc通过MessageType type字段区分;
- 状态机:
- raft实现数据一致性是通过replicated log(复制日志)实现的,这里的replicated log有时也称为WAL(write ahead log);
- raft算法中,每个节点raftnode可能处于以下状态中的一种:follower、candidate、precandidate、leader;
- raft算法中,每个节点的状态可以通过一个状态机来建模;
了解了这些基础知识之后,我们结合etcd/contrib/raftexample来解释下raft如何选举,以及leader遇到写操作如何保证数据强一致。
etcd/raft如何进行leader选举
newRaftNode newRaftNode,see 源码,这个函数包括创建一个var rc raftNode,然后rc.startRaft(),这个函数包含非常重要的几个部分:
- startNode,see 源码,这个主要是建立好raftnode启动时的一些初始状态转换,有一个for事件循环处理,如改变raftnode的状态:tick函数、step函数,以及一些message的处理等等;
- serveRaft:
- serveChannels:see 源码
startNode:
- 如何查看这部分源码呢,首先从启动一个raftnode开始吧:see 源码;
- StartNode函数启动一个raftnode,节点刚启动的时候state都是follower:see 源码;
- StartNode→Bootstrap(peers)通过配置告诉当前raftnode有多少个raftpeers,然后这些raftpeers加入与当前节点所在的集群属于变更配置,也要记录到raftlog中;
- raftnode真正跑起来是在这里:see 源码,这里有个大的for循环,node的主要逻辑都在这里了;
- tickElection:for/switch-case n.tickC,选举逻辑,此时如果当前raftnode为follower或者candidate吧,此时的tick函数为tickElection,如果选举超时时间过了并且没收到leader的heartbeat来重置选举超时时间,此时会将MsgHup消息类型传入step函数中,将当前follower变为candidate发起选举:see 源码。这里的选举在raft论文中是直接就是选举动作,但是工程上做了优化,引入了一个可选的两阶段选举prevote。虽然可以tick是触发了tickElection,但是这个后续执行中会检查当前节点是否有资格成为leader,不一定有资格(比如自身的WAL不满足条件)。
- 假如有资格发起选举,则会调用becomeCandidate,会将当前raftnode的term+1,并且step函数变为stepCandidate。然后会调用r.poll来判断是否胜选:see 源码,其实这里是判断的自己给自己投票的话能否胜选,对于single raftnode的集群有用,假如是多节点集群那么这里无法胜选,继续看。ps:如果胜选就becomeLeader成为leader了。如果不是单节点,就要通过r.send发送投票给各个peers:see 源码。这里的r.send并不是真的网络发送,而是记录到r.msgs里面等下处理这里的r.msgs。注意这里r.send的时候已经编程了MsgVote类型了,表示投票请求,后续也应该收到MsgVoteResp。
- r.msgs什么时候处理呢?还是前面我们提到的这个大循环体,每轮循环都会检查r.msgs中有没有message要处理:see 源码,这里的函数n.rn.HasReady()方法检查到len(r.msgs)>0,则认为有消息要处理,这个消息最终会被包装到一个Ready{}事件中,这个事件会被丢到n.readyc这个chan中,什么时候处理在下面serveChannels中介绍。
- becomeLeader:see 源码,step函数变为stepLeader,tick函数则变成tickHeartbeat,意味着当前为leader需要给followers定时发送heartbeat来重置它们的选举超时时间,那么heartbeat是什么形式的呢?其实就是通过appendEntry,只不过entry为空,用这种空的entry来表示心跳。leader就要担负起write请求的重任了。
- 但是如果没胜选的话,raftnode的状态就是candidate,step函数未stepCandidate,下面会继续用到。
serveChannels: 前面关于r.msgs的消息没跟踪到在哪里处理的,我们看下是不是在serveChannels里面? serveChannels: see 源码
- 这个函数里面也有一个for事件循环,当它发现rc.node.Ready()有var rd Ready{}事件可处理时,如果rd上有非空的snapshot,就写入storage,然后将rd.Entries也记录到rd.HardState,然后将rd.Entries也写入storage,最后将rd.Messages发送到peers。我们感觉voteMsg是在这个时候发送给peers的,到底是不是呢:see 源码。是的,这里的rd事件就是从raftnode.Ready()从发从其raftnode.readyc这个channel中取出来的。取出来后通过transport发送出去,这样voteMsg就发送出去了,那么投票的响应又是什么时候收到、什么时候处理的呢?
- startRaft/AddPeer的时候会调用startPeer,内部会开始循环收包,收peer发来的raftmessage放入一个recvc chan中,startPeer中专门开启了一个goroutine来检查recvc中有没有peer发送来的消息,比如peer发送给我们的voteMsg的响应包:see 源码。这里通过raft.Process(ctx,m)对raftmessage进行处理。如何处理是在这个示例代码中定义的:see 源码,即调用step函数进行处理。我们再往回看下,发送这个消息前已经把节点的step函数修改为了stepCandidate,那我们再看下这个函数里面干了啥,猜测应该有判断是否收到多数投票确认的逻辑;
- stepCandidate:see 源码,我们不考虑可选的prevote阶段,很显然这个消息MsgVote的响应类型应该是MsgVoteResp,如果是的话,就继续r.poll检查下是否胜选吧,如果胜选了,则自己becomeLeader,然后广播appendEntries,这里append是干啥,是为了通治其他peers更新commit index吧。
这样leader选举就完成了!!!
etcd/raft leader执行put操作如何保证强一致
用etcd/raft实现强一致的系统示例:https://github.com/etcd-io/etcd/tree/main/contrib/raftexample。我们不妨从这个项目入手来看下到底是怎么工作的。上述项目是一个暴露http接口的kv强一致存储系统。
接下来重点看一个leader负责执行命令put <key> <value>
时的执行逻辑,是怎样的,领略下这个过程中raft扮演的角色。
put命令是通过http put method实现的,see 源码。
这里的处理逻辑也很简单,它直接调用了h.store.Propose(key, string(v)),h.store是一个kvstore,这里的Propose是干嘛呢?这里就可以跟raft算法中的MsgProp关联起来了,还记得吗?MsgProp这种消息类型是用来appendEntries的。这里的思想就是WAL(write ahead log)的思想,先把动作记录到日志中,后面在通过日志来更新状态机。状态机的状态都包含什么呢,我们前面已经知道有各种状态的流转,那么这个日志中记录的数据存储在哪呢?
就是这里的kvstore啊!一个raftnode启动后要把快照、日志中记录的事件还原到一个特定的存储中,这个示例中就是一个内存中的kv数据结构。h.store.Propose(key,string(v))首先异步地调用kvstore.Propose(key,val)将数据写入到proposeC这个chan中:see 源码,然后再异步地从中取出来:see 源码,通过rc.node.Propose(ctx, prop),转入raft.node.Propose实现:see 源码,这里的n.stepWait方法将MsgProp消息类型以及要写入的日志数据传给stepWait,这里面将消息写入到raftnode.propc就完事返回了。
startPeer从这个propc chan中取出消息m,然后r.Process(ctx, m)去处理,r.Process方法是在示例代码中自定义的,see 源码,通过r.Process进入step函数又来到r.Step(m),此时raftnode.Step函数是什么呢?赶紧看看发送MsgProp消息时又没有更改raftnode.Step,没有,那这个Step应该是stepLeader…没错,沿着stepLeader一路看下来:see 源码,这里果然是leader让peers appendEntries的动作,干了什么呢?
首先当前raftnode.appendEntries,把MsgProp消息里的日志项(可能有多条)先追加到自己的log entries里面,然后bcastAppend发送给所有的peers让它们去append entries,它们追加成功后肯定回回包MsgPropResp消息类型的消息。我们看看这个消息是在哪里处理的?感觉应该也是在startPeer函数中的收包逻辑里面。那应该也是从recvc chan中取出回包处理。
哈哈,看半天竟然没搜索到MsgPropResp消息类型,前面读源码时有个细节漏掉了。sendAppend的时候实际上会把消息类型改成MsgApp(MessageAppend)去追加日志,followers处理完成后响应一个MsgAppResp消息类型。对于leader raftnode,收到消息后触发状态转换,又要执行其step函数,此时step函数还是stepLeader,发现消息是MsgAppResp,准备处理:see 源码。
我们先考虑正常情况,leader收到响应发现follower在WAL中记录了发送的log entries,leader收到此响应后就会决定是否要更新该follower的next index(下次要发送的log entries开始索引)。然后判断是否可以更新leader的commit index了,更新了之后对client的读请求就可见了。leader更新了commit index之后也要通过bcastAppend通知followers更新commit index。
这些已提交索引之前的log entries会被发布到示例代码中的commitsC chan中,然后有一个goroutine专门读取这上面的commitC并把其中的entries读取出来,应用到我们的kvstore中,这样存储的一些数据就从WAL日志转化为了内存数据结构中的真实数据,可以对外提供查询服务了。
小结
大致就是这些内容吧!感兴趣的继续深挖下raftexample+raft实现吧。感觉自己已经理解了raft的核心思想以及如何使用raft来开发强一致存储系统了,读者是不是也有同感呢:)