#头条创作挑战赛#
一、前言
前置知识点:RocketMQ中Netty服务器RemotingServer发送请求的3种方式
上一篇我们大致讲了一下broker与客户端之间的网络交互组件Broker2Client,那么这一节,将详细的讲解Broker的服务元素据是如何传送给NameServer的;
二、源码导读
Broker组件是如何上报自身信息给NameServer的? 首先这里要说的是Broker组件启动流程是与之前讲的NameServer组件的启动流程,NameServer启动流程(二)是一摸一样的,Broker首先通过BrokerStartup的main方法,经过一系列的配置文件参数封装,然后创建BrokerController实例,最终调用BrokerController#initialize方法进行一些初始化操作,这些初始化操作具体是什么,将在后续文章中详细讲解。再调用BrokerController#start方法将一些内部组件进行启动。其中将服务元素据注册到NameServer上也是再里面进行的。我们定位到start方法注册的代码块中:
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 强制注册
this.registerBrokerAll(true, false, true);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时注册
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
可以看到在BrokerController#start方法中有2处调用了registerBrokerAll方法进行发路由注册,其中第一处是强制进行注册,第二处是定时注册,每隔10-60s之间,具体取决于我们的registerNameServerPeriod参数。
registerBrokerAll方法实际会调用BrokerOuterAPI的registerBrokerAll方法请求nameserver进行注册broker;
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 调用了TopicConfigManager的方法,用本地的topic元数据构建了一份数据
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager()
.buildTopicConfigSerializeWrapper();
// 判断是否有读写权限
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
// 遍历每一个topic元数据
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
// 将topicConfig进行复制一份
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp = new TopicConfig(
topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission()
);
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 如果说强制要注册,或者是判断下来需要去进行注册的
if (forceRegister
|| needRegister(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills()
)
) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
三、源码分析
- 构造方法;
- 获取nameserver地址;
- 向nameserver发起注册broker请求;
- 更新nameserver地址列表;
- 下线broker请求;
- 请求nameserver判断是否需要注册;
- 请求nameserver去查询所有的topic元数据;
- 请求nameserver获取所有的消费者偏移量;
- 请求nameserver获取所有的delay延迟消息偏移量;
- 请求nameserver获取所有的订阅消费组配置数据;
- 向netty客户端组件注册请求回调钩子;
1、构造方法
// broker对外输出api组件
public class BrokerOuterAPI {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// netty客户端组件
private final RemotingClient remotingClient;
// 地址组件
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
// nameserver地址
private String nameSrvAddr = null;
// 固定数量线程池,线程数量是4~10个,用来跟nameserver发起注册broker请求
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(
4,
10,
1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32),
new ThreadFactoryImpl("brokerOutApi_thread_", true)
);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig); // 构建了一个netty客户端
this.remotingClient.registerRPCHook(rpcHook);
}
}
2、获取nameserver地址
//获取nameserver地址
public String fetchNameServerAddr() {
try {
// 通过top addressing组件,去获取nameserver地址
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
3、向nameserver发起注册broker请求
// broker可以在这里通过netty客户端组件,跟nameserver发起请求,注册broker
public List<RegisterBrokerResult> registerBrokerAll(
// 集群名称
final String clusterName,
// broker地址
final String brokerAddr,
// broker分组名称
final String brokerName,
// brokerid
final long brokerId,
// 高可用服务器地址
final String haServerAddr,
// topic元数据序列化组件
final TopicConfigSerializeWrapper topicConfigWrapper,
// 过滤服务器地址列表
final List<String> filterServerList,
// 是否oneway请求
final boolean oneway,
// 超时时间
final int timeoutMills,
// 是否启用压缩
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 根据nameServer数量创建CountDownLatch
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 遍历每个nameserver地址,发送一个注册请求
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor*ex.e**cute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
// 等待所有nameServer注册broker请求发送完毕
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
// 真正发起注册broker请求函数
private RegisterBrokerResult registerBroker(
final String namesrvAddr, // nameserver地址
final boolean oneway, // 是否oneway请求
final int timeoutMills, // 超时时间
final RegisterBrokerRequestHeader requestHeader, // 注册broker请求头
// 请求体
final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
if (oneway) {
try {
// 发送oneway请求
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// 同步发送请求,并且等待指定时间,超时抛出超时异常
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
4、更新nameserver地址列表
// 更新nameserver地址列表
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
lst.add(addr);
}
// 可以对我们的remoting client里面去更新一下namesever地址列表
this.remotingClient.updateNameServerAddressList(lst);
}
5、下线broker请求
// 把自己的broker去进行下线请求
public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
// 对每个namesrvAddr发送下线请求
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
}
public void unregisterBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
// 同步发送请求,并且等待指定时间,超时抛出超时异常
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
6、请求nameserver判断是否需要注册
// 找nameserver判断是否需要注册
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
// 他会去获取到nameserver地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(
nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
// 想要跟每个nameserver地址都去做一个通信
brokerOuterExecutor*ex.e**cute(new Runnable() {
@Override
public void run() {
try {
// 他会把请求头里面封装进去一些关键信息,通用rpc协议里面自定义信息的载体
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.QUERY_DATA_VERSION,
requestHeader
);
request.setBody(topicConfigWrapper.getDataVersion().encode());
RemotingCommand response = remotingClient.invokeSync(
namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response
.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
changed = true;
}
}
if (changed == null || changed) {
changedList.add(Boolean.TRUE);
}
}
default:
break;
}
log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
} catch (Exception e) {
changedList.add(Boolean.TRUE);
log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("query dataversion from nameserver countDownLatch await Exception", e);
}
}
return changedList;
}
7、请求nameserver去查询所有的topic元数据
// 可以找nameserver去查询所有的topic元数据
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
8、请求nameserver获取所有的消费者偏移量
// 获取所有的消费者偏移量
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
// 同步发送请求,并且等待指定时间,超时抛出超时异常
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
9、请求nameserver获取所有的delay延迟消息偏移量
// 获取所有的delay延迟消息偏移量
public String getAllDelayOffset(
final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
// 同步发送请求,并且等待指定时间,超时抛出超时异常
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
10、请求nameserver获取所有的订阅消费组配置数据
// 获取所有的订阅消费组配置数据
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
// 同步发送请求,并且等待指定时间,超时抛出超时异常
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
11、向netty客户端组件注册请求回调钩子
// 注册请求回调钩子
public void registerRPCHook(RPCHook rpcHook) {
remotingClient.registerRPCHook(rpcHook);
}
@Override
public void registerRPCHook(RPCHook rpcHook) {
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}