rocketmq主从同步源码分析 (rocketmq的broker持久化)

#头条创作挑战赛#

一、前言

前置知识点:RocketMQ中Netty服务器RemotingServer发送请求的3种方式

上一篇我们大致讲了一下broker与客户端之间的网络交互组件Broker2Client,那么这一节,将详细的讲解Broker的服务元素据是如何传送给NameServer的;

二、源码导读

Broker组件是如何上报自身信息给NameServer的? 首先这里要说的是Broker组件启动流程是与之前讲的NameServer组件的启动流程,NameServer启动流程(二)是一摸一样的,Broker首先通过BrokerStartup的main方法,经过一系列的配置文件参数封装,然后创建BrokerController实例,最终调用BrokerController#initialize方法进行一些初始化操作,这些初始化操作具体是什么,将在后续文章中详细讲解。再调用BrokerController#start方法将一些内部组件进行启动。其中将服务元素据注册到NameServer上也是再里面进行的。我们定位到start方法注册的代码块中:

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    startProcessorByHa(messageStoreConfig.getBrokerRole());
    handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
    // 强制注册
    this.registerBrokerAll(true, false, true);
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            // 定时注册
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

可以看到在BrokerController#start方法中有2处调用了registerBrokerAll方法进行发路由注册,其中第一处是强制进行注册,第二处是定时注册,每隔10-60s之间,具体取决于我们的registerNameServerPeriod参数。

registerBrokerAll方法实际会调用BrokerOuterAPI的registerBrokerAll方法请求nameserver进行注册broker;

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // 调用了TopicConfigManager的方法,用本地的topic元数据构建了一份数据
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager()
            .buildTopicConfigSerializeWrapper();

    // 判断是否有读写权限
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        // 遍历每一个topic元数据
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        // 将topicConfig进行复制一份
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp = new TopicConfig(
                    topicConfig.getTopicName(),
                    topicConfig.getReadQueueNums(),
                    topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission()
            );
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

    // 如果说强制要注册,或者是判断下来需要去进行注册的
    if (forceRegister
        || needRegister(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills()
            )
    ) {
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

三、源码分析

  1. 构造方法;
  2. 获取nameserver地址;
  3. 向nameserver发起注册broker请求;
  4. 更新nameserver地址列表;
  5. 下线broker请求;
  6. 请求nameserver判断是否需要注册;
  7. 请求nameserver去查询所有的topic元数据;
  8. 请求nameserver获取所有的消费者偏移量;
  9. 请求nameserver获取所有的delay延迟消息偏移量;
  10. 请求nameserver获取所有的订阅消费组配置数据;
  11. 向netty客户端组件注册请求回调钩子;

1、构造方法

// broker对外输出api组件
public class BrokerOuterAPI {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    // netty客户端组件
    private final RemotingClient remotingClient;
    // 地址组件
    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
    // nameserver地址
    private String nameSrvAddr = null;
    // 固定数量线程池,线程数量是4~10个,用来跟nameserver发起注册broker请求
    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(
            4,
            10,
            1, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(32),
            new ThreadFactoryImpl("brokerOutApi_thread_", true)
    );

    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
        this.remotingClient = new NettyRemotingClient(nettyClientConfig); // 构建了一个netty客户端
        this.remotingClient.registerRPCHook(rpcHook);
    }
}

2、获取nameserver地址

//获取nameserver地址
public String fetchNameServerAddr() {
    try {
        // 通过top addressing组件,去获取nameserver地址
        String addrs = this.topAddressing.fetchNSAddr();
        if (addrs != null) {
            if (!addrs.equals(this.nameSrvAddr)) {
                log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
                this.updateNameServerAddressList(addrs);
                this.nameSrvAddr = addrs;
                return nameSrvAddr;
            }
        }
    } catch (Exception e) {
        log.error("fetchNameServerAddr Exception", e);
    }
    return nameSrvAddr;
}

3、向nameserver发起注册broker请求

// broker可以在这里通过netty客户端组件,跟nameserver发起请求,注册broker
public List<RegisterBrokerResult> registerBrokerAll(
        // 集群名称
        final String clusterName,
        // broker地址
        final String brokerAddr,
        // broker分组名称
        final String brokerName,
        // brokerid
        final long brokerId,
        // 高可用服务器地址
        final String haServerAddr,
        // topic元数据序列化组件
        final TopicConfigSerializeWrapper topicConfigWrapper,
        // 过滤服务器地址列表
        final List<String> filterServerList,
        // 是否oneway请求
        final boolean oneway,
        // 超时时间
        final int timeoutMills,
        // 是否启用压缩
        final boolean compressed) {

    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);

        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);

        // 根据nameServer数量创建CountDownLatch
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());

        // 遍历每个nameserver地址,发送一个注册请求
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor*ex.e**cute(new Runnable() {
                @Override
                public void run() {
                    try {
                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            // 等待所有nameServer注册broker请求发送完毕
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }

    return registerBrokerResultList;
}
// 真正发起注册broker请求函数
private RegisterBrokerResult registerBroker(
        final String namesrvAddr, // nameserver地址
        final boolean oneway, // 是否oneway请求
        final int timeoutMills, // 超时时间
        final RegisterBrokerRequestHeader requestHeader, // 注册broker请求头
        // 请求体
        final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);

    if (oneway) {
        try {
            // 发送oneway请求
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
    // 同步发送请求,并且等待指定时间,超时抛出超时异常
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

4、更新nameserver地址列表

// 更新nameserver地址列表
public void updateNameServerAddressList(final String addrs) {
    List<String> lst = new ArrayList<String>();
    String[] addrArray = addrs.split(";");
    for (String addr : addrArray) {
        lst.add(addr);
    }
    // 可以对我们的remoting client里面去更新一下namesever地址列表
    this.remotingClient.updateNameServerAddressList(lst);
}

5、下线broker请求

// 把自己的broker去进行下线请求
public void unregisterBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) {
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) {
            try {
                // 对每个namesrvAddr发送下线请求
                this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
            } catch (Exception e) {
                log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
            }
        }
    }
}
public void unregisterBroker(
    final String namesrvAddr,
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
    UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);

    // 同步发送请求,并且等待指定时间,超时抛出超时异常
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}

6、请求nameserver判断是否需要注册

// 找nameserver判断是否需要注册
public List<Boolean> needRegister(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final int timeoutMills) {
    final List<Boolean> changedList = new CopyOnWriteArrayList<>();
    // 他会去获取到nameserver地址列表
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        final CountDownLatch countDownLatch = new CountDownLatch(
                nameServerAddressList.size());

        for (final String namesrvAddr : nameServerAddressList) {
            // 想要跟每个nameserver地址都去做一个通信
            brokerOuterExecutor*ex.e**cute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 他会把请求头里面封装进去一些关键信息,通用rpc协议里面自定义信息的载体
                        QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
                        requestHeader.setBrokerAddr(brokerAddr);
                        requestHeader.setBrokerId(brokerId);
                        requestHeader.setBrokerName(brokerName);
                        requestHeader.setClusterName(clusterName);

                        RemotingCommand request = RemotingCommand.createRequestCommand(
                                RequestCode.QUERY_DATA_VERSION,
                                requestHeader
                        );
                        request.setBody(topicConfigWrapper.getDataVersion().encode());

                        RemotingCommand response = remotingClient.invokeSync(
                                namesrvAddr, request, timeoutMills);
                        DataVersion nameServerDataVersion = null;
                        Boolean changed = false;
                        switch (response.getCode()) {
                            case ResponseCode.SUCCESS: {
                                QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response
                                        .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                changed = queryDataVersionResponseHeader.getChanged();
                                byte[] body = response.getBody();
                                if (body != null) {
                                    nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
                                    if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
                                        changed = true;
                                    }
                                }
                                if (changed == null || changed) {
                                    changedList.add(Boolean.TRUE);
                                }
                            }
                            default:
                                break;
                        }
                        log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
                    } catch (Exception e) {
                        changedList.add(Boolean.TRUE);
                        log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });

        }
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("query dataversion from nameserver countDownLatch await Exception", e);
        }
    }
    return changedList;
}

7、请求nameserver去查询所有的topic元数据

// 可以找nameserver去查询所有的topic元数据
public TopicConfigSerializeWrapper getAllTopicConfig(
    final String addr) throws RemotingConnectException, RemotingSendRequestException,
    RemotingTimeoutException, InterruptedException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

8、请求nameserver获取所有的消费者偏移量

// 获取所有的消费者偏移量
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    // 同步发送请求,并且等待指定时间,超时抛出超时异常
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

9、请求nameserver获取所有的delay延迟消息偏移量

// 获取所有的delay延迟消息偏移量
public String getAllDelayOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
    RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
    // 同步发送请求,并且等待指定时间,超时抛出超时异常
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

10、请求nameserver获取所有的订阅消费组配置数据

// 获取所有的订阅消费组配置数据
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
    // 同步发送请求,并且等待指定时间,超时抛出超时异常
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

11、向netty客户端组件注册请求回调钩子

// 注册请求回调钩子
public void registerRPCHook(RPCHook rpcHook) {
    remotingClient.registerRPCHook(rpcHook);
}
@Override
public void registerRPCHook(RPCHook rpcHook) {
    if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
        rpcHooks.add(rpcHook);
    }
}