RocketMQ源码学习(二)-Broker启动
启动方法
org.apache.rocketmq.broker.BrokerStartup#main
public static void main(String[] args) { start(createBrokerController(args)); } 复制代码
核心组件BrokerController创建
org.apache.rocketmq.broker.BrokerStartup#createBrokerController
// ToDo: k1-> 创建核心组件BrokerController public static BrokerController createBrokerController(String[] args) { ... // ToDo: k1->Broker的核心配置信息 final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); // TLS安全相关 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); // netty服务的监听端口 nettyServerConfig.setListenPort(10911); // Broker存储消息的一些配置 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ... // ToDo: k2->判断是否基于Dledger技术来管理主从同步和CommitLog条件的就是将brokerId设置为-1 if (messageStoreConfig.isEnableDLegerCommitLog()) { brokerConfig.setBrokerId(-1); } ... // ToDo: k1->创建BrokerController final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); // BrokerController的初始化 boolean initResult = controller.initialize(); } 复制代码
BrokerController初始化
BrokerController属性
BrokerController有很多属性,捡几个介绍。
public BrokerController( // 四个核心组件 final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; // ToDo: k2->Broker各种功能对应的组件 // 管理Consumer消费offset this.consumerOffsetManager = new ConsumerOffsetManager(this); // 管理Topic配置 this.topicConfigManager = new TopicConfigManager(this); // 处理Consumer拉取消息请求 this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); ... } 复制代码
BrokerController初始化
org.apache.rocketmq.broker.BrokerController#initialize
主要是关于Broker配置文件信息的加载,相关组件线程池的创建,以及一些定时任务。
// ToDo: k1->BrokerController的初始化 public boolean initialize() throws CloneNotSupportedException { // 加载磁盘配置信息,这些配置信息就使用到了MessageStoreConfig boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { // 消息存储管理组件,管理磁盘上的消息的 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 如果启动Dledger,就初始化与Dledger相关的组件 if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } ... } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } // 加载磁盘文件 result = result && this.messageStore.load(); // ToDo: k2->Broker的Netty组件,注意:Broker既是服务端(接收Producer和Consumer的请求),又是客户端(向NameServer和Consumer发送消息) if (result) { ... // 处理Consumer的pull请求的线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); ... // 定时进行统计Broker的任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); // 定时进行Consumer消费offset持久化到磁盘的任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 对Consumer的filter过滤器进行持久化的任务,从这里也可以知道消费者的filter是被下推到Broker来执行的 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); ... // 定时进行落后Commitlog分发的任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); // 设置NameServer的地址列表,可以设置加载,也可以发起运城请求加载 if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 定时获取NameServer地址 BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } ... // 初始化,使用SPI加载的服务,可扩展 initialTransaction(); initialAcl(); initialRpcHooks(); } return result; } 复制代码
初始化完成,开始启动
start启动
org.apache.rocketmq.broker.BrokerStartup#start
整个方法裁剪一下,就干了一件事,启动Broker。然后在start中启动了Broker包含的各种怪组件。部分代码如下:
// 启动Broker public void start() throws Exception { // 启动消息存储组件 if (this.messageStore != null) { this.messageStore.start(); } // 启动Netty服务,这样就可以接收请求了 if (this.remotingServer != null) { this.remotingServer.start(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } // 与文件相关的一个服务组件, 暂不深究 if (this.fileWatchService != null) { this.fileWatchService.start(); } // ToDo: k2->Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发送请求,例如心跳 if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } // ToDo: k2->broker核心的心跳注册任务,可以深入理解 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } // 任务时间间隔默认30秒 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); ... } 复制代码
以上就是Broker启动的整个流程。其整体结构如下:
接下来,来详细看一下Broker心跳注册的过程
Broker注册
org.apache.rocketmq.broker.BrokerController#registerBrokerAll
代码前半部分是Topic的相关配置。后面判断Broker是否需要注册:如下:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ... // ToDo: k1->关键代码,判断是否需要注册,然后调用doRegisterBrokerAll去正真的注册Broker if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } } 复制代码
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
注册带代码逻辑的真正实现
// ToDo: k2->注册真正的核心代码 private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // Broker向所有的NameServer发起请求,所以注册结果返回的是一个List List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( ... // 如果注册结果大于0,那么就取list中的第一个值 if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { // 主节点地址 if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } // 从节点地址 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } } 复制代码
封装请求报文,向NameServer发请求;
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
获取服务列表信息,封装报文,注册:
// ToDo: k1: 获取NameServer所有的列表信息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { ... // 向NameServer注册 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); // 注册完成,本地缓存 if (result != null) { registerBrokerResultList.add(result); } ... } 复制代码
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker
向netty真正发起请求的方法:
... // 封装一个网络请求 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); // 真正发送网络请求的地方,这个remotingClient就是一个NettyClinet RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); // 封装网络请求结果 assert response != null; 复制代码
至此注册完毕
作者:YuJian
链接:https://juejin.cn/post/7021042272905265159