1 Star 15 Fork 2

colins / raft-java-demo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

raft-java-demo

一、Raft前置简介

Raft目前是最著名的分布式共识性算法,被广泛的应用在各种分布式框架、组件中,如Redis、RocketMq、Kafka、Nacos(CP)等

根据Raft论文,可将Raft拆分为如下4个功能模块

  • 领导者选举
  • 日志同步、心跳
  • 持久化
  • 日志压缩,快照(本文未实现)

这4个模块彼此并不完全独立,如日志的同步情况左右着领导者选举,快照也影响着日志同步等等;为了前后的递进性,对于一些功能的实现,可能会出现改动和优化,比如日志同步实现后,在数据持久化部分又会对同步做一些优化,提高主、从节点日志冲突解决的性能。

这里就不再过多的介绍了,看本文之前请先简单了解一下Raft算法,提供如下资料:

本文实现不完全和Raft论文一致,做了不少改动,核心思想不变,请悉知!!

二、快速开始

提供了测试类,供用户快速体验和测试

集群启动

每一个测试方法代表了一个节点,每启动一个则会生成一个日志文件,因为并没有接注册中心所以不支持动态节点接入,只能在启动时设置好集群节点,目前提供了5集群节点

设置了大量DEBUG日志,以便观察状态,如果发生如下异常,是正常的(主要是心跳检测任务被中断导致的):

客户端测试

同时提供了向集群发送命令请求的测试方法,注意目前只支持SET,GET命令,自带重定向处理,会自动请求到leader节点

会得到像这样的结果:

三、功能流程简介

你看完上述资料,应该对Raft有一个基本了解了,本文我们实现了一个Raft算法下的简易版的KV存储,我将它拆分成一下几个角色:

RPC模块:复制各节点间的信息传递,如心跳、日志、选举等等

节点模块:节点有三种状态leader、follow、candidate,每种状态下所要做的事是不一样的

状态机:负责节点状态的变更,日志持久化一致性处理,投票一致性处理

定时任务:leader需要定时发送心跳,follow需要定时检测leader是否存活等

日志模块:日志需要持久化在本地文件,还需要给其他节点同步

以上几个角色相互配合,实现以下几个主要功能流程:

1.选举流程

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Follow节点发现Leader节点挂了,则升级为Candidate节点发起投票
  2. 其他Follow节点收到投票请求后,根据条件判断是否投票给它,True或者False
  3. Candidate一旦收到的投票通过请求过半,则升级为Leader
  4. 升级Leader后发送心跳,阻止其他Follow变成Candidate

2.心跳流程

注意:这里和原文有区别,我将心跳和日志做了拆分,不再耦合了,因为我觉得在没有客户端请求的情况下,记录这些心跳日志没有意义,在没有数据日志或者说数据日志水平都是一样的情况下,谁做Leader我觉得都OK

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Leader会定时发送心跳请求给Follow,告诉它我还活着,防止它篡位
  2. Follow收到心跳后返回一个心跳响应
  3. Leader收到的心跳响应没有过半则自动降级成为Follow停止对外服务

(为什么要心跳响应,还要自动降级?后面咱们细说)

3.KV客户端请求流程

因为我们要做的是一个简易版KV嘛,那肯定有客户端发送命令嘛对不对:

  1. 客户端发送SET或者GET命令,集群返回成功或者数据
  2. 发送SET命令,只有Leader会处理,同步给其他Follow,然后根据结果返回成功还是失败
  3. 发送GET命令,目前也只有Leader会处理,返回对应数据,没有就null(GET没有日志产生)

节点间日志的同步持久化后面细说,这里也看的出来为什么分布式体系下CAP不能共存了,你想要高可用,性能好,就必须在请求leader刷盘成功后返回甚至异步刷盘,这就必然导致可能存在数据丢失或者主从数据不一致的情况,如果你想要一致性,就必须在节点日志都同步完成后才返回(下面我们将日志同步流程)

4.日志同步流程

上面说过了,我们将心跳和日志做了拆分,只有客户端请求SET命令才会产生日志

  1. Leader收到客户端请求后,先预提交到内存中,后发送预提交命令给所有Follow
  2. Follow收到Leader的预提交命令同样先提交到内存,然后响应Leader
  3. Leader一旦收到超过半数的Follow响应则执行刷盘持久化,否则给客户端响应失败
  4. Leader刷盘成功后,给所有Follow发送刷盘请求,然后给客户端响应成功(无需关心Follow刷盘结果)

这就是很典型的CP流程,保持了一致性和数据不丢失,但大大降低了性能(发现没有尽管这样做,依旧可能存在Follow数据丢失的情况,比如:我是新加入的Follow节点、Follow节点刷盘失败等等情况,那该怎么办呢,我们下面接着来补充)

5.日志校验流程

正如上所说,日志依旧存在丢失的风险,我们需要做一个日志校验定时任务,定时校验日志是否丢失,由于这个和日志的设计息息相关,所以我们后面在细说,这里简单过一遍流程

  1. follow会有一个定时任务,定时Check日志文件,寻找缺失的日志
  2. 如果有则拿到缺失的日志发送拉取请求到Leader,获取对应的日志
  3. 然后填充进日志文件,这样就一定保持了和Leader日志数据对齐了

难道每次都要从头到尾扫描一次文件吗?当然不是,扫描过的不需要扫描,有checkPoint,每次只是从checkPoint扫描到lastLogIndex

四、模块简介

1.RPC模块

这里我们采用Netty框架来做,每个节点即是Client又是Server

按原Raft算法来说,一共有以下几种RPC类型的通信:

RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。 AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

但是我将它进行了一个拆分,拆分的更细了:

  • RequestVoteRPC-请求投票 RPC,由 Candidate 在选举期间发起。
  • RequestVoteResult-投票响应RPC,由follow投票
  • HeartBeatRequest-心跳RPC,由leader定时不间断发起
  • HeartBeatResult-心跳响应RPC,由follow响应
  • AppendEntriesPreCommit-日志预提交RPC,由leader发起预提交
  • AppendEntriesPreCommitResult-日志预提交响应RPC,由follow响应
  • AppendEntriesCommit-日志提交RPC,预提交成功后,leader会发起真正提交的命令
  • LogIndexPull-日志拉取RPC,follow定时检测发现自身存在日志丢失,向leader主动拉取日志
  • LogIndexPullResult-日志拉取响应RPC,leader发现follow存在日志缺失,把日志发给follow
  • ClientRequest-客户端请求RPC,KV存储的客户端,向集群发出的命令
  • ClientResponse-客户端请求响应RPC,对客户端的响应

分别对应着一个实体类:

RPC整体的编解码设计,序列化等等,都和我之前写的RPC框架差不多,这里就不在过多的介绍了,有兴趣可以看看我的: 如何从0-1手写一个RPC框架

这里只介绍一下相比原来做出的调整,原来RPC框架传输的实体是固定的,而现在多了很多,而且大量涉及到同步请求返回,所以相比原来新增了泛型的处理,如下示例,两行代码就搞定了一次请求:

RpcSession<ClientResponse, ClientRequest> rpcSession = RpcSessionFactory.<ClientResponse, ClientRequest>openSession(serverConfig, clientRequest);
ClientResponse clientResponse = rpcSession.<ClientResponse>syncSend(4000L);

同时支持:同步等待、超时等待、异步三种请求方式:

public interface RpcSession<R,T>{

    <R> R syncSend();

    <R> R syncSend(long timeout);

    void asyncSend();
}

感兴趣的建议自己看看,RPC所在目录和Netty所有Handler如下:

2.节点模块

节点有三种类型,leader、candidate、follow,所以我这抽象出一个节点接口,三种实现,一个统一对外服务,一个全局节点信息类

一个节点接口

public interface RaftNode {

    /** 客户端的请求,会产生日志 : 只有leader才会处理,follow返回leader地址,candidate拒绝 */
    ClientResponse clientRequestHandler(ClientRequest command,List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;

    /** leader发来的log预处理:会先缓存 */
    AppendEntriesPreCommitResult logPreCommitHandler(AppendEntriesPreCommit appendEntriesPreCommit);

    /** leader发来的log提交请求 */
    void logCommitHandler(AppendEntriesCommit appendEntriesCommit);

    /** follow发来的log拉取请求 */
    LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex);

    /** leader要处理follow的拉取请求 */
    LogIndexPullResult logPullRequestHandler(LogIndexPull logIndexPull);

    /** 发起投票 : 只有候选者 才会发起 */
    boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;

    /** 投票请求处理 */
    RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC);

    /** 发起心跳 : 只有领导才会发起心跳 阻止其他节点成为候选人*/
    boolean callHeartBeatRequest(List<ServerConfig>serverConfigs) throws ExecutionException, InterruptedException;

    /** 心跳请求处理 : 只有追随者/候选人才会处理*/
    HeartBeatResult heartBeatHandler(HeartBeatRequest heartBeatRequest);
    

}

三种实现

一个对外服务

public class RaftNodeService {

    private static final Logger log = LoggerFactory.getLogger(RaftNodeService.class);

    // 心跳间隔时间
    private final static long INTERVAL_TIME = 1500L;

    private static Map<NodeStatusEnums, RaftNode> raftNodeMap = new ConcurrentHashMap<>(8);

    static {
        raftNodeMap.put(NodeStatusEnums.LEADER, new LeaderRaftNode());
        raftNodeMap.put(NodeStatusEnums.CANDIDATE, new CandidateRaftNode());
        raftNodeMap.put(NodeStatusEnums.FOLLOW, new FollowRaftNode());
    }

    /**
     * 节点信息初始化
     */
    public static void raftNodeInit(ServerConfig self, List<ServerConfig> clusterConfig) {
        RaftNodeInfo.getInstance().setSelf(self);
        RaftNodeInfo.getInstance().setClusterConfig(clusterConfig);
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);
        createElectionTask();
    }

    /**
     * 发送心跳
     */
    public synchronized static void sendHeartBeat() {
        RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
        boolean result = false;
        try {
            result = raftNode.callHeartBeatRequest(RaftNodeInfo.getInstance().getClusterConfig());
        } catch (ExecutionException e) {
            log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        } catch (InterruptedException e) {
            log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        }
        if (!result) {
            // 代表心跳失败了,状态已经变更了
            // 需要停止心跳,开启心跳检测
            heartBeatTestDestroy();
            createElectionTask();
        }
    }

    /**
     * 心跳处理
     */
    public static void heartBeatHandler(HeartBeatRequest request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<HeartBeatResult>(raftNode.heartBeatHandler(request)));
            // 收到了心跳,所以就要停止当前心跳的检测,然后重新开启一个检测任务
            createElectionTask();
        });
    }

    // 命令合规性校验 目前就get set 随便校验一下
    public static boolean commandCheck(String command) {
        if (command == null || !"SET_GET".contains(command.split(" ")[0]) || command.split(" ").length < 2) {
            return false;
        }
        return true;
    }

    /**
     * 客户端的请求,以KV为例 就是set命令 , 这里请求返回就简陋一点
     */
    public static void clientRequestHandler(ClientRequest request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            ClientResponse clientResponse = ClientResponse.builder().build();
            clientResponse.setRequestId(request.getRequestId());
            if (!commandCheck(request.getCommand())) {
                clientResponse.setCode(401);
                clientResponse.setMsg("命令格式不正确");
                channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                return;
            }
            // 只有set命令才需要发送日志,get命令直接取数据就行了
            String[] command = request.getCommand().split(" ");
            if (command[0].equals("SET")) {
                RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
                try {
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(raftNode.clientRequestHandler(request, RaftNodeInfo.getInstance().getClusterConfig())));
                    return;
                } catch (ExecutionException e) {
                    log.debug(" {}: 日志提交失败了:{}", request.getCommand(), e.getMessage(), e);
                    clientResponse.setCode(500);
                    clientResponse.setMsg(e.getMessage());
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                } catch (InterruptedException e) {
                    log.debug(" {}: 日志提交失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
                    clientResponse.setCode(500);
                    clientResponse.setMsg(e.getMessage());
                    channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
                }
            } else {
                // get命令直接取值
                clientResponse.setCode(200);
                clientResponse.setData(RaftNodeInfo.getInstance().getLogManage().getDataByKey(command[1]));
                channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));
            }
        });
    }


    /**
     * Log预提交请求
     */
    public static void logPreCommitHandler(AppendEntriesPreCommit request, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<AppendEntriesPreCommitResult>(raftNode.logPreCommitHandler(request)));
        });
    }

    /**
     * Log提交请求
     */
    public static void logCommitHandler(AppendEntriesCommit request) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            raftNode.logCommitHandler(request);
            // 收到了日志,所以就要停止当前心跳的检测,然后重新开启一个检测任务
            createElectionTask();
        });
    }

    /**
     * 发起投票
     */
    public synchronized static void sendCallVote() {
        RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
        boolean result = false;
        try {
            result = raftNode.callVoteRequest(RaftNodeInfo.getInstance().getClusterConfig());
        } catch (ExecutionException e) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        } catch (InterruptedException e) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);
        }
        if (!result) {
            // 代表发起投票失败了,状态已经变更了
            // 需要重新开启一个检测任务
            createElectionTask();
            return;
        }
        // 投票成功了 需要开启心跳任务
        createHearBeatTask();
    }

    /**
     * 发起投票请求处理
     */
    public synchronized static void callVoteHandler(RequestVoteRPC requestVoteRPC, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            channel.writeAndFlush(new RpcRemoteMsg<RequestVoteResult>(raftNode.voteRequestHandler(requestVoteRPC)));
        });
    }

    /**
     * 发起Log拉取请求
     */
    public synchronized static LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex) {
        if (CollectionUtil.isNotEmpty(pullLogIndex)) {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            return raftNode.sendLogPullRequest(pullLogIndex);
        }
        return null;
    }

    /**
     * 发起Log拉取请求处理
     */
    public synchronized static void logPullRequestHandler(LogIndexPull logIndexPull, Channel channel) {
        ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {
            RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());
            log.debug("{}:拉取日志:{}",channel.remoteAddress(),JSONObject.toJSON(logIndexPull));
            channel.writeAndFlush(new RpcRemoteMsg<LogIndexPullResult>(raftNode.logPullRequestHandler(logIndexPull)));
        });
    }


    /**
     * 销毁并创建心跳检测任务
     */
    public static void createElectionTask() {
        long randomTime = getRandomTime();
        final long intervalTime = INTERVAL_TIME + randomTime;
        // 先销毁之前的
        electionTaskDestroy();
        //开启新的
        ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.schedule(new ElectionTask(intervalTime), intervalTime, TimeUnit.MILLISECONDS);
        RaftNodeInfo.getInstance().setElectionTask(schedule);
    }

    /**
     * 销毁并创建心跳任务
     */
    public static void createHearBeatTask() {
        // 先销毁之前的
        heartBeatTestDestroy();
        //开启新的
        ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.scheduleAtFixedRate(new HeartBeatTask(), 0L, INTERVAL_TIME, TimeUnit.MILLISECONDS);
        RaftNodeInfo.getInstance().setElectionTask(schedule);
    }


    public static long getRandomTime() {
        // 要比心跳慢一点
        return RandomUtil.randomLong(250L, 1000L);
    }

    /**
     * 销毁心跳检测任务
     */
    public static void electionTaskDestroy() {
        if (null != RaftNodeInfo.getInstance().getElectionTask()) {
            RaftNodeInfo.getInstance().getElectionTask().cancel(true);
            RaftNodeInfo.getInstance().setElectionTask(null);
        }
    }

    /**
     * 销毁心跳任务
     */
    public static void heartBeatTestDestroy() {
        if (null != RaftNodeInfo.getInstance().getHeartBeatTask()) {
            RaftNodeInfo.getInstance().getHeartBeatTask().cancel(true);
            RaftNodeInfo.getInstance().setHeartBeatTask(null);
        }
    }

一个全局节点信息类

public class RaftNodeInfo {

    /**
     * 自己
     */
    private ServerConfig self;

    /**
     * 集群其他节点信息
     */
    private List<ServerConfig> clusterConfig;

    /**
     * 当前节点状态 默认FOLLOW
     */
    private volatile NodeStatusEnums currentNodeStatus = NodeStatusEnums.FOLLOW;

    /**
     * 当前节点任期
     */
    private volatile long currentTerm = 0L;

    /**
     * 当前leader
     */
    private volatile String currentLeaderId;

    /**
     * 最后日志索引 已提交的
     */
    private volatile long lastLogIndex = 0L;

    /**
     * 最后的日志任期 这我这没用到
     */
    private volatile long lastLogTerm = 0L;

    /**
     * 当前任期给谁投过票
     */
    private volatile String voteFor;

    /**
     * 最近更新时间  心跳或者日志更新
     **/
    private volatile long lastUpdateTime = 0L;

    /**
     * 心跳任务
     **/
    private ScheduledFuture heartBeatTask;

    /**
     * 心跳检测任务
     **/
    private ScheduledFuture electionTask;

    /**
     * 日志管理
     **/
    private LogManage logManage;

    /**
     * 日志文件
     **/
    private String logPath;
}

3.状态机

提供节点状态变更、心跳结果处理、投票结果处理、日志一致性处理

public class StateMachines {
    private static final Logger log = LoggerFactory.getLogger(StateMachines.class);

    /** 候选人-》leader */
    public static void becomeLeader(){
        // 变为leader
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.LEADER);
        // leader设置为自己
        RaftNodeInfo.getInstance().setCurrentLeader(RaftNodeInfo.getInstance().getSelf().toString());
        // 票清了
        RaftNodeInfo.getInstance().setVoteFor(null);
    }

    /** follow-》候选人 */
    public static void becomeCandidate(){
        // 变为候选人
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.CANDIDATE);
        // 任期+1
        RaftNodeInfo.getInstance().setCallVoteTerm();
        // 给自己投一票
        RaftNodeInfo.getInstance().setVoteFor(RaftNodeInfo.getInstance().getSelf().toString());
    }

    /** 候选人、leader->follow */
    public static void becomeFollow(long term,String leaderId,String voteFor){
        RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);
        RaftNodeInfo.getInstance().setCurrentLeader(leaderId);
        RaftNodeInfo.getInstance().setCurrentTerm(term);
        RaftNodeInfo.getInstance().setVoteFor(voteFor);
        RaftNodeInfo.getInstance().setLastUpdateTime(System.currentTimeMillis());
    }


    /** 投票结果一致性处理 */
    public static boolean voteResultHandler(List<Future<RequestVoteResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {
        int voteNum = 0;
        for (Future<RequestVoteResult> future : taskList) {
            RequestVoteResult voteResult = future.get();
            // 判断leader是否还存活 存活的话肯定要把我给否了呀
            if (leaderIsLive(voteResult)) {
                return false;
            }
            if(voteResult!=null){
                log.debug("投票结果,我的term:{} ,结果:{}",RaftNodeInfo.getInstance().getCurrentTerm(), JSONObject.toJSON(voteResult));
            }
            if (null != voteResult && voteResult.isVoteGranted()) {
                voteNum++;
            }
        }

        if (voteNum != 0 && voteNum >= (nodeNum / 2)) {
            // 投票通过 升级为leader
            StateMachines.becomeLeader();

            log.debug(" {}: 哈哈哈,我升级为leader啦", RaftNodeInfo.getInstance().getSelf().toString());
            return true;
        } else {
            // 投票不通过,退成follow 继续苟着
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);

            log.debug(" {}: 完了,这帮人不支持我,等待机会再试", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }
    }

    // 判断leader是否还存活 存活的话肯定要把我给否了呀
    private static boolean leaderIsLive(RequestVoteResult voteResult) {
        if (null != voteResult && StrUtil.isNotEmpty(voteResult.getLeaderId())) {
            // 被leader一票否决,退成follow 继续苟着
            StateMachines.becomeFollow(voteResult.getTerm(), voteResult.getLeaderId(), null);
            return true;
        }
        return false;
    }

    /** 心跳结果一致性处理 */
    public static boolean heartBeatResultHandler(List<Future<HeartBeatResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {
        int responseNum = 0;
        for (Future<HeartBeatResult> future : taskList) {
            HeartBeatResult heartBeatResult = future.get();
            if (null != heartBeatResult) {
                responseNum++;
            }
        }
        if (responseNum != 0 && responseNum >= (nodeNum / 2)) {
            log.debug("{}: 万众一心,我再接再厉", RaftNodeInfo.getInstance().getSelf().toString());
            return true;
        } else {
            // 没有应答或者应答数量小于一半 就退化为候选者,并停止对外提供服务
            // 状态变更
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug("{}: 我找不到追随者了,我暂时停止对外服务", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }
    }

    /** 日志预提交结果 */
    public static boolean logPreCommitHandler(List<Future<AppendEntriesPreCommitResult>> taskList, Integer nodeNum) throws ExecutionException, InterruptedException {
        int responseNum = 0;
        for (Future<AppendEntriesPreCommitResult> future : taskList) {
            AppendEntriesPreCommitResult preCommitResult = future.get();
            if (null != preCommitResult && preCommitResult.isSuccess()) {
                responseNum++;
            }
        }
        return responseNum != 0 && responseNum >= (nodeNum / 2);
    }
}

4.日志模块

public interface LogManage extends ResourceLifeCycle{

    /** leader预提交 */
    long preCommitLog(LogEntity logEntity);

    /** follow预提交 */
    void preCommitLog(long preCommitLogId,LogEntity logEntity);

    /** 缓存移除 */
    void cacheLogRemove(long cacheLogId);

    /** leader日志提交 */
    long commitLog(long cacheLogId);

    /** follow日志提交 */
    void commitLog(long cacheLogId,long logIndex);

    /** follow日志Check */
    void logIndexCheck();

    /** 根据日志索引获取日志内容 */
    LogEntity getLogEntityByIndex(long logIndex, RandomAccessFile file);

    /** 命令数据处理 */
    void dataHandler(String command);

    /** 根据Key获取数据 */
    String getDataByKey(String key);
}

5.定时任务

  • ElectionTask:心跳检测任务,不通过则升级为Candidate
  • HeartBeatTask:心跳任务,不断给Follow发送心跳,阻止其成为Candidate
  • LogIndexCheckTask:Follow日志Check定时任务

五、核心流程介绍

其实流程图已经很清楚了,这里挑部分来聊聊

1.选举

目前心跳设置的时间为1500ms,心跳检测的时间为1750ms+0-750ms随机数(之前随机数设置的很短,算上网络延迟等因素,导致两个Candidate同任期的几率非常之高),follow收到心跳会更新lastUpdateTIme,而心跳检测则会检测这个时间到当前时间是否超过检测时间间隔,超过了则会变成candidate发起选举

CandidateRaftNode:发起选举RPC

选举RPC实体类

public class RequestVoteRPC extends RpcMsgId implements Serializable {

    /** 候选人的任期号  */
    private long term;

    /** 请求选票的候选人的 Id(ip:selfPort) */
    private String candidateId;

    /** 候选人的最后日志条目的索引值 */
    private long lastLogIndex;

    /** 候选人最后日志条目的任期号  */
    private long lastLogTerm;

}

选举方法

    public boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {
        if (CollectionUtil.isEmpty(serverConfigs)) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.error("只有一个节点,还发起什么投票?");
            return false;
        }
        // candidate 会发起投票请求
        RaftNodeInfo instance = RaftNodeInfo.getInstance();

        // 投票过程中 可能又收到了心跳或者日志,状态已经变为follow
        if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {
            return false;
        }
        log.debug(" {}: 哈哈哈,我发起了投票", RaftNodeInfo.getInstance().getSelf().toString());
        List<Future<RequestVoteResult>> taskList = new ArrayList<>(serverConfigs.size());

        // 加上自己的一票 需要 大于= n/2+1
        // 所以直接 >= n/2 就算通过了
        // 但是注意此时如果已经存在leader,日志数又不比当前leader大,所以leader还是leader 具有一票否决权
        for (ServerConfig serverConfig : serverConfigs) {
            Future<RequestVoteResult> voteResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {
                // 构建投票
                RequestVoteRPC voteRPC = RequestVoteRPC.builder().candidateId(instance.getSelf().toString())
                        .term(instance.getCurrentTerm())  // 成为候选 的时候任期就+1了
                        .lastLogIndex(instance.getLastLogIndex()).build();
                RpcSession<RequestVoteResult, RequestVoteRPC> voteRPCRpcSession = RpcSessionFactory.<RequestVoteResult, RequestVoteRPC>openSession(serverConfig, voteRPC);
                return voteRPCRpcSession == null ? null : voteRPCRpcSession.syncSend(1000L);
            });
            taskList.add(voteResultFuture);
        }
        // 投票过程中 可能状态又已经变为follow
        if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {
            return false;
        }

        return StateMachines.voteResultHandler(taskList, serverConfigs.size());

    }

Follow选举响应

  1. 任期比我大我就同意
  2. 任期跟我一样,记录的日志比我多而且我没有投过票我也同意

(Follow同一个任期内只能投一票)

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {
        // follow 需要处理投票请求
        RaftNodeInfo instance = RaftNodeInfo.getInstance();
        RequestVoteResult voteResult = RequestVoteResult.builder().term(instance.getCurrentTerm()).build();
        voteResult.setRequestId(voteRPC.getRequestId());
        // 1.任期比我大,我直接就同意
        if (voteRPC.getTerm() > instance.getCurrentTerm()) {
            return agreeVote(voteResult, voteRPC);
        }
        // 2.任期跟我一样,记录的日志比我多 而且 我没有投过票
        // 我只能投一票
        if ((voteRPC.getTerm() == instance.getCurrentTerm() && voteRPC.getLastLogIndex() >= instance.getLastLogIndex())
                && (instance.getVoteFor() == null || instance.getVoteFor().equals(voteRPC.getCandidateId()))) {
            return agreeVote(voteResult, voteRPC);
        }
        voteResult.setTerm(instance.getCurrentTerm());
        voteResult.setVoteGranted(false);
        log.info(" {}: 我身为现任Follow,我不认可你的实力,我不能给你投票:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
        return voteResult;
    }

    private RequestVoteResult agreeVote(RequestVoteResult voteResult, RequestVoteRPC voteRPC) {

        voteResult.setTerm(RaftNodeInfo.getInstance().getCurrentTerm());
        voteResult.setVoteGranted(true);

        RaftNodeInfo.getInstance().setCurrentTerm(voteRPC.getTerm());
        RaftNodeInfo.getInstance().setVoteFor(voteRPC.getCandidateId());

        log.info(" {}: 我身为现任Follow,我认可你的实力,我给你投票:{}", RaftNodeInfo.getInstance().getSelf().toString(), voteRPC.getCandidateId());
        return voteResult;
    }

Leader响应

leader有没有可能收到投票?有可能!假设某一个Follow延迟收到心跳或者没有收到心跳就会发起,那leader就会收到它发起的投票,那怎么办?判断任期和日志,任期和日志都比Leader大则Leader需要退位,否则Leader应该具有一票否决权(这样就防止了某个follow无限发起投票,任期无限+1这种情况)

一个candidate任期非常大的时候,其他follow必然会给他投票,那这样就升为leader就导致了同时存在两个leader的情况,所以这时候的当期leader应该具有一票否决权

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {
        // leader 有可能收到 候选者的投票申请
        RaftNodeInfo instance = RaftNodeInfo.getInstance();
        RequestVoteResult requestVoteResult = RequestVoteResult.builder().build();
        requestVoteResult.setRequestId(voteRPC.getRequestId());
        // 候选人的任期比我大 而且日志还比我大 说明我已经out了,我需要退位
        if (voteRPC.getTerm() >= instance.getCurrentTerm() && voteRPC.getLastLogIndex() > instance.getLastLogIndex()) {
            // 状态变更
            StateMachines.becomeFollow(voteRPC.getTerm(), voteRPC.getCandidateId(), null);

            requestVoteResult.setTerm(voteRPC.getTerm());
            requestVoteResult.setVoteGranted(true);
            log.info(" {}: 我身为现任leader,我认可你的实力,我下位让贤:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
            return requestVoteResult;
        }
        log.info(" {}: 我身为现任leader,不同你的上任请求:{}", instance.getSelf().toString(), voteRPC.getCandidateId());
        // 否则就不同意,而且你还得给我老实点
        requestVoteResult.setTerm(instance.getCurrentTerm());
        requestVoteResult.setVoteGranted(false);
        requestVoteResult.setLeaderId(instance.getSelf().toString());
        return requestVoteResult;
    }

2.心跳

心跳这里我做了一个响应降级的操作,其实正常是不需要的,我这的目的是防止网络分区!

假设原本是这样:

一旦网络分区则会变成这样,导致两个leader的出现,所以这时候心跳的响应就至关重要,一旦响应少于半数,则leader应该自动降级

LeaderRaftNode:发起心跳

public boolean callHeartBeatRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {

        if (CollectionUtil.isEmpty(serverConfigs)) {
            StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);
            log.debug(" {}: 只有一个leader,还发什么心跳?", RaftNodeInfo.getInstance().getSelf().toString());
            return false;
        }

        List<Future<HeartBeatResult>> taskList = new ArrayList<>(serverConfigs.size());

        // leader 需要发送心跳 防止网络分区,一旦心跳返回不足 n/2 则自动降级
        for (ServerConfig serverConfig : serverConfigs) {
            Future<HeartBeatResult> heartBeatResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {
                HeartBeatRequest build = HeartBeatRequest.builder()
                        .leaderId(RaftNodeInfo.getInstance().getSelf().toString())
                        .leaderLastCommitIndex(RaftNodeInfo.getInstance().getLastLogIndex())
                        .term(RaftNodeInfo.getInstance().getCurrentTerm()).build();
                RpcSession<HeartBeatResult, HeartBeatRequest> heartBeatRequestRpcSession = RpcSessionFactory.<HeartBeatResult, HeartBeatRequest>openSession(serverConfig, build);
                return heartBeatRequestRpcSession == null ? null : heartBeatRequestRpcSession.syncSend(200L);
            });
            taskList.add(heartBeatResultFuture);
        }
        // 响应结果处理
        return StateMachines.heartBeatResultHandler(taskList, serverConfigs.size());
    }

3.日志

日志设计的非常之简陋,就不做过多的介绍了,本文目的还是以实现Raft为主,性能问题暂不考虑,不过还是说一下测试结果,因为KV存储,项目启动需要读取数据放入内存,目前读取50m左右文件10w条日志需要8s左右,肯定是不合理的,目前并没有做日志压缩和快照,也没有用零拷贝技术,因为不想搞的太过复杂

关于日志check,这里放上两种测试常见的结果

1.新的节点加入,需要拉取一次所有数据

2.日志中间缺失

两种情况都是没问题的!

六、遗留的问题

注意:尽管这样还是有几率导致数据丢失的!!!!

再次强调:本文不完全和Raft论文对标,加了不少个人的想法进去,所以在这个过程中都是遇到问题、思考问题、解决问题,这本就是一个学习的过程,目前最大的一个问题就是:

新加入的节点已经收到了Leader的数据,更新的lastCommitIndex,但是还没来得及向Leader同步以前的数据,而这时Leader挂了,所以这时候这个节点就有几率通过投票成为Leader,这时候数据就有几率丢失文章中可能看不太出来,具体得看看代码,这算是一个很严重的BUG,各位想想可以怎么解决,而Raft又是怎么解决的?

当然可能还有其他问题,各位大佬如果知道的也可以提出来

七、总结

只有深入本质才能顺应发展,在分布式体系下,共识算法是必不可少的,光看不实践就容易眼高手低,当初我看Raft的时候也感觉挺简单的,不就是三种状态做不同的事,然后状态变更嘛,真正一做起来就发现好多细节都需要考虑,这还只是个demo,回头想想RocketMq和kafka的存储设计是真的厉害,做完这个又收获不少

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

用java实现的一个简易版Raft共识算法,实现了一个简易版KV存储,实现了选举、心跳、日志拉取、日志同步等,结构分明,注释完善,提供测试类启动,不完全和Raft论文一致,自己做了很多修改 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/colins0902/raft-java-demo.git
git@gitee.com:colins0902/raft-java-demo.git
colins0902
raft-java-demo
raft-java-demo
master

搜索帮助