raft算法有哪些实现 (raft分布式一致性协议)

SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。 使用 SOFAJRaft 你可以专注于自己的业务领域,由 SOFAJRaft 负责处理所有与 RAFT 相关的技术难题,并且 SOFAJRaft 非常易于使用,你可以通过几个示例在很短的时间内掌握它。

功能特性

  • Leader 选举
  • 日志复制和恢复
  • 快照和日志压缩
  • 集群线上配置变更,增加节点、删除节点、替换节点等
  • 主动变更 Leader,用于重启维护,Leader 负载平衡等
  • 对称网络分区容忍性
  • 非对称网络分区容忍性
  • 容错性,少数派故障,不影响系统整体可用性
  • 多数派故障时手动恢复集群可用
  • 高效的线性一致读,ReadIndex/LeaseRead
  • 流水线复制
  • 内置了基于 Metrics 类库的性能指标统计,有丰富的性能统计指标
  • 通过了 Jepsen 一致性验证测试
  • JRaft 中包含了一个嵌入式的分布式 KV 实现

介绍

本介绍内容来自 braft 文档,原文链接请参见这里。braft 的关于算法和应用本身的文档非常优秀,由于 jraft 脱胎自 braft,我们强烈推荐阅读上述文档以了解 raft 算法的基本原理和应用。

分布式一致性

分布式一致性 (distributed consensus) 是分布式系统中最基本的问题,用来保证一个分布式系统的可靠性以及容灾能力。简单的来讲,就是如何在多个机器间对某一个值达成一致, 并且当达成一致之后,无论之后这些机器间发生怎样的故障,这个值能保持不变。 抽象定义上, 一个分布式系统里的所有进程要确定一个值 v,如果这个系统满足如下几个性质, 就可以认为它解决了分布式一致性问题, 分别是:

  • Termination: 所有正常的进程都会决定 v 具体的值,不会出现一直在循环的进程。
  • Validity: 任何正常的进程确定的值 v', 那么 v' 肯定是某个进程提交的。比如随机数生成器就不满足这个性质。
  • Agreement: 所有正常的进程选择的值都是一样的。

一致性状态机

对于一个无限增长的序列 a[1, 2, 3…], 如果对于任意整数 i, a[i] 的值满足分布式一致性,这个系统就满足一致性状态机的要求。 基本上所有的系统都会有源源不断的操作, 这时候单独对某个特定的值达成一致是不够的。为了真实系统保证所有的副本的一致性,通常会把操作转化为 write-ahead-log(简称WAL)。然后让系统的所有副本对WAL保持一致,这样每个进程按照顺序执行WAL里的操作,就能保证最终的状态是一致的。

javaraft实现,raft算法和实现

RAFT

RAFT 是一种新型易于理解的分布式一致性复制协议,由斯坦福大学的 Diego Ongaro 和 John Ousterhout 提出,作为 RAMCloud 项目中的中心协调组件。Raft 是一种 Leader-Based 的 Multi-Paxos 变种,相比 Paxos、Zab、View Stamped Replication 等协议提供了更完整更清晰的协议描述,并提供了清晰的节点增删描述。 Raft 作为复制状态机,是分布式系统中最核心最基础的组件,提供命令在多个节点之间有序复制和执行,当多个节点初始状态一致的时候,保证节点之间状态一致。系统只要多数节点存活就可以正常处理,它允许消息的延迟、丢弃和乱序,但是不允许消息的篡改(非拜占庭场景)。

javaraft实现,raft算法和实现

Raft 可以解决分布式理论中的 CP,即一致性和分区容忍性,并不能解决 Available 的问题。其中包含分布式系统中一些通常的功能:

  • Leader Election
  • Log Replication
  • Membership Change
  • Log Compaction

RAFT 可以做什么

通过 RAFT 提供的一致性状态机,可以解决复制、修复、节点管理等问题,极大的简化当前分布式系统的设计与实现,让开发者只关注于业务逻辑,将其抽象实现成对应的状态机即可。基于这套框架,可以构建很多分布式应用:

  • 分布式锁服务,比如 Zookeeper
  • 分布式存储系统,比如分布式消息队列、分布式块系统、分布式文件系统、分布式表格系统等
  • 高可靠元信息管理,比如各类 Master 模块的 HA

Counter 例子详解

本文档主要介绍一个基于 jraft 的分布式计数器的例子。

场景

在多个节点(机器)组成的一个 raft group 中保存一个分布式计数器,该计数器可以递增和获取,并且在所有节点之间保持一致,任何少数节点的挂掉都不会影响对外提供的两个服务:

  1. incrmentAndGet(delta) 递增 delta 数值并返回递增后的值。
  2. get() 获取最新的值

RPC 请求

jraft 底层使用 bolt 作为通讯框架,定义两个请求

  1. IncrementAndGetRequest,用于递增
public class IncrementAndGetRequest implements Serializable {
 private static final long serialVersionUID = -5623664785560971849L;
 private long delta;
 public long getDelta() {
 return this.delta;
 }
 public void setDelta(long delta) {
 this.delta = delta;
 }
}
  1. GetValueRequest,用于获取最新值:
public class GetValueRequest implements Serializable {
 private static final long serialVersionUID = 9218253805003988802L;
 public GetValueRequest() {
 super();
 }
}

应答结果 ValueResponse,包括:

  1. success 是否成功
  2. value 成功情况下返回的最新值
  3. errorMsg 失败情况下的错误信息
  4. redirect 发生了重新选举,需要跳转的新的leader节点。
public class ValueResponse implements Serializable {
 private static final long serialVersionUID = -4220017686727146773L;
 private long value;
 private boolean success;
 /**
 * redirect peer id
 */
 private String redirect;
 private String errorMsg;
 public String getErrorMsg() {
 return this.errorMsg;
 }
 public void setErrorMsg(String errorMsg) {
 this.errorMsg = errorMsg;
 }
 ......
}

IncrementAndAddClosure 用于 Leader 服务端接收 IncrementAndGetRequest 请求后的回调处理:

public class IncrementAndAddClosure implements Closure {
 private CounterServer counterServer;
 private IncrementAndGetRequest request;
 private ValueResponse response;
 private Closure done; // 网络应答callback
 public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response,
 Closure done) {
 super();
 this.counterServer = counterServer;
 this.request = request;
 this.response = response;
 this.done = done;
 }
 @Override
 public void run(Status status) {
 // 返回应答给客户端
 if (this.done != null) {
 done.run(status);
 }
 }
 public IncrementAndGetRequest getRequest() {
 return this.request;
 }
 public void setRequest(IncrementAndGetRequest request) {
 this.request = request;
 }
 public ValueResponse getResponse() {
 return this.response;
 }
}

服务端

状态机 CounterStateMachine

首先持有一个初始值:

public class CounterStateMachine extends StateMachineAdapter {
 /**
 * counter value
 */
 private AtomicLong value = new AtomicLong(0); 

实现核心的 onApply(iterator) 方法,应用用户提交的请求到状态机:

 @Override
 public void onApply(Iterator iter) {
 // 遍历日志
 while (iter.hasNext()) {
 long delta = 0;
 IncrementAndAddClosure closure = null;
 // done 回调不为null,必须在应用日志后调用,如果不为 null,说明当前是leader。
 if (iter.done() != null) {
 // 当前是leader,可以直接从 IncrementAndAddClosure 中获取 delta,避免反序列化
 closure = (IncrementAndAddClosure) iter.done();
 delta = closure.getRequest().getDelta();
 } else {
 // 其他节点应用此日志,需要反序列化 IncrementAndGetRequest,获取 delta
 ByteBuffer data = iter.getData();
 try {
 IncrementAndGetRequest request = Codecs.getSerializer(Codecs.Hessian2).decode(data.array(),
 IncrementAndGetRequest.class.getName());
 delta = request.getDelta();
 } catch (CodecException e) {
 LOG.error("Fail to decode IncrementAndGetRequest", e);
 }
 }
 long prev = this.value.get();
 // 更新状态机
 long updated = value.addAndGet(delta);
 // 更新后,确保调用 done,返回应答给客户端。
 if (closure != null) {
 closure.getResponse().setValue(updated);
 closure.getResponse().setSuccess(true);
 closure.run(Status.OK());
 }
 LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
 iter.next();
 }
 }

CounterServer

启动一个 raft node节点,提供分布式计数器服务,内部使用 jraft 提供的 RaftGroupService 服务框架:

public class CounterServer {
 // jraft 服务端服务框架
 private RaftGroupService raftGroupService;
 // raft 节点
 private Node node;
 // 业务状态机
 private CounterStateMachine fsm;
 public CounterServer(String dataPath, String groupId, PeerId serverId, NodeOptions nodeOptions) throws IOException {
 // 初始化路径
 FileUtils.forceMkdir(new File(dataPath));
 // 初始化全局定时器
 TimerManager.init(50);
 // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开.
 RpcServer rpcServer = new RpcServer(serverId.getPort());
 RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
 // 注册业务处理器
 rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
 rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
 // 初始化状态机
 this.fsm = new CounterStateMachine();
 // 设置状态机到启动参数
 nodeOptions.setFsm(this.fsm);
 // 设置存储路径
 // 日志, 必须
 nodeOptions.setLogUri(dataPath + File.separator + "log");
 // 元信息, 必须
 nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
 // snapshot, 可选, 一般都推荐
 nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
 // 初始化 raft group 服务框架
 this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
 // 启动
 this.node = this.raftGroupService.start();
 }
 public CounterStateMachine getFsm() {
 return this.fsm;
 }
 public Node getNode() {
 return this.node;
 }
 public RaftGroupService RaftGroupService() {
 return this.raftGroupService;
 }
 /**
 * 生成重定向请求
 */
 public ValueResponse redirect() {
 ValueResponse response = new ValueResponse();
 response.setSuccess(false);
 if (node != null) {
 PeerId leader = node.getLeaderId();
 if (leader != null) {
 response.setRedirect(leader.toString());
 }
 }
 return response;
 }
 public static void main(String[] args) throws IOException {
 if (args.length != 4) {
 System.out
 .println("Useage : java com.alipay.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
 System.out
 .println("Example: java com.alipay.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
 System.exit(1);
 }
 String dataPath = args[0];
 String groupId = args[1];
 String serverIdStr = args[2];
 String initConfStr = args[3];
 NodeOptions nodeOptions = new NodeOptions();
 // 为了测试, 调整 snapshot 间隔等参数
 nodeOptions.setElectionTimeoutMs(5000);
 nodeOptions.setDisableCli(false);
 nodeOptions.setSnapshotIntervalSecs(30);
 // 解析参数
 PeerId serverId = new PeerId();
 if (!serverId.parse(serverIdStr)) {
 throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
 }
 Configuration initConf = new Configuration();
 if (!initConf.parse(initConfStr)) {
 throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
 }
 // 设置初始集群配置
 nodeOptions.setInitialConf(initConf);
 // 启动
 CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
 System.out.println("Started counter server at port:"
 + counterServer.getNode().getNodeId().getPeerId().getPort());
 }
}

启动三个节点的参数类似:

windows 用户请注意第一个参数的数据目录设置

/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

分别为 server1/server2/server3三个目录,raft group名称为 counter,节点ip也分别为

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

注册的网络请求处理器,我们看下 IncrementAndGetRequestProcessor 实现,一个普通的 bolt processor :

public class IncrementAndGetRequestProcessor extends AsyncUserProcessor<IncrementAndGetRequest> {
 private static final Logger LOG = LoggerFactory.getLogger(IncrementAndGetRequestProcessor.class);
 private CounterServer counterServer;
 public IncrementAndGetRequestProcessor(CounterServer counterServer) {
 super();
 this.counterServer = counterServer;
 }
 @Override
 public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, IncrementAndGetRequest request) {
     // 非leader,生成跳转请求
 if (!counterServer.getFsm().isLeader()) {
 asyncCtx.sendResponse(counterServer.redirect());
 return;
 }
 // 构建应答回调
 ValueResponse response = new ValueResponse();
 IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response, new Closure() {
 @Override
 public void run(Status status) {
 // 提交后处理
 if (!status.isOk()) {
 // 提交失败,返回错误信息
 response.setErrorMsg(status.getErrorMsg());
 response.setSuccess(false);
 }
 // 成功,返回ValueResponse应答
 asyncCtx.sendResponse(response);
 }
 });
 try {
 // 构建提交任务
 Task task = new Task();
 task.setDone(closure); // 设置回调
 // 填充数据,将请求用 hessian2序列化到 data 字段
 task.setData(ByteBuffer.wrap(Codecs.getSerializer(Codecs.Hessian2).encode(request)));
 // 提交到 raft group
 counterServer.getNode().apply(task);
 } catch (CodecException e) {
 // 处理序列化异常
 LOG.error("Fail to encode IncrementAndGetRequest", e);
 ValueResponse responseObject = response;
 responseObject.setSuccess(false);
 responseObject.setErrorMsg(e.getMessage());
 asyncCtx.sendResponse(responseObject);
 }
 }
 @Override
 public String interest() {
 return IncrementAndGetRequest.class.getName();
 }
}

客户端

客户端 CounterClient 比较简单,主要使用 jraft 提供的 RouteTable 来刷新获取最新的 leader 节点,然后发送请求到 leader节点即可:

public class CounterClient {
 public static void main(String[] args) throws Exception { 
 if (args.length != 2) {
 System.out.println("Useage : java com.alipay.jraft.example.counter.CounterClient {groupId} {conf}");
 System.out
 .println("Example: java com.alipay.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
 System.exit(1);
 }
 String groupId = args[0];
 String confStr = args[1];
 Configuration conf = new Configuration();
 if (!conf.parse(confStr)) {
 throw new IllegalArgumentException("Fail to parse conf:" + confStr);
 }
 // 更新raft group配置
 RouteTable.getInstance().updateConfiguration(groupId, conf);

接下来初始化 RPC 客户端并更新路由表:

BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
 throw new IllegalStateException("Refresh leader failed");
}

获取 leader 后发送请求:

PeerId leader = RouteTable.getInstance().selectLeader(groupId);
System.out.println("Leader is " + leader);
int n = 1000;
CountDownLatch latch = new CountDownLatch(n);
long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
 incrementAndGet(cliClientService, leader, i, latch);
}
latch.await();
System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
System.exit(0);

incrementAndGet 方法实现比较简单了:

private static void incrementAndGet(BoltCliClientService cliClientService, PeerId leader, long delta,
 CountDownLatch latch) throws RemotingException, InterruptedException {
 // 构建 IncrementAndGetRequest 请求并发送到 leader
 IncrementAndGetRequest request = new IncrementAndGetRequest();
 request.setDelta(delta);
 cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
 new InvokeCallback() {
 @Override
 public void onResponse(Object result) {
 latch.countDown();
 System.out.println("incrementAndGet result:" + result);
 }
 @Override
 public void onException(Throwable e) {
 e.printStackTrace();
 latch.countDown();
 }
 @Override
 public Executor getExecutor() {
 return null;
 }
 }, 5000);
}

Snapshot 实现

为了避免每次节点重启的时候,重新应用一遍所有的日志,并且避免保存所有的日志,可以使用 snapshot 机制,也就是为状态机做一个 checkpoint,保存当时状态机的状态,删除在此之前的所有日志,核心是实现 StateMachine的两个方法:

  1. onSnapshotLoad,启动或者安装 snapshot 后加载 snapshot
  2. onSnapshotSave ,定期保存 snapshot

我们先为 Counter实现一个snapsho t数据文件:

public class CounterSnapshotFile {
 private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class);
 private String path;
 public CounterSnapshotFile(String path) {
 super();
 this.path = path;
 }
 public String getPath() {
 return this.path;
 }
 /**
 * Save value to snapshot file.
 * @param value
 * @return
 */
 public boolean save(long value) {
 try {
 FileUtils.writeStringToFile(new File(path), String.valueOf(value));
 return true;
 } catch (IOException e) {
 LOG.error("Fail to save snapshot", e);
 return false;
 }
 }
 public long load() throws IOException {
 String s = FileUtils.readFileToString(new File(path));
 if (!StringUtils.isBlank(s)) {
 return Long.parseLong(s);
 }
 throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
 }
}

保存到指定的 path 。

然后实现 StateMachine的两个方法:

 public boolean onSnapshotLoad(SnapshotReader reader) {
 // leader不用从 snapshot 加载,他不会接受 snapshot 安装请求
 if (isLeader()) {
 LOG.warn("Leader is not supposed to load snapshot");
 return false;
 }
 // 未找到数据文件,忽略
 if (reader.getFileMeta("data") == null) {
 LOG.error("Fail to find data file in {}", reader.getPath());
 return false;
 }
 // 将 snapshot 保存在 reader.getPath()/data 文件里
 CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
 try {
 this.value.set(snapshot.load());
 return true;
 } catch (IOException e) {
 LOG.error("Fail to load snapshot from {}", snapshot.getPath());
 return false;
 }
 }
 public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
 // 获取此刻状态机状态
 final long currVal = this.value.get();
 // 异步保存,避免阻塞状态机
 Utils.runInThread(new Runnable() {
 @Override
 public void run() {
 CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
 if (snapshot.save(currVal)) {
 if (writer.addFile("data")) {
 done.run(Status.OK());
 } else {
 done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
 }
 } else {
 done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
 }
 }
 });
 }

snapshot 的间隔可以通过 NodeOptions 的 snapshotIntervalSecs 控制,默认一个小时。