阅读 116

RocketMQ源码学习(二)-Broker启动

启动方法

org.apache.rocketmq.broker.BrokerStartup#main

public static void main(String[] args) {     start(createBrokerController(args)); } 复制代码

核心组件BrokerController创建

org.apache.rocketmq.broker.BrokerStartup#createBrokerController

// ToDo: k1-> 创建核心组件BrokerController public static BrokerController createBrokerController(String[] args) { ...     // ToDo: k1->Broker的核心配置信息     final BrokerConfig brokerConfig = new BrokerConfig();     final NettyServerConfig nettyServerConfig = new NettyServerConfig();     final NettyClientConfig nettyClientConfig = new NettyClientConfig();     // TLS安全相关     nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,         String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));     // netty服务的监听端口     nettyServerConfig.setListenPort(10911);     // Broker存储消息的一些配置     final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ...     // ToDo: k2->判断是否基于Dledger技术来管理主从同步和CommitLog条件的就是将brokerId设置为-1     if (messageStoreConfig.isEnableDLegerCommitLog()) {         brokerConfig.setBrokerId(-1);     } ...     // ToDo: k1->创建BrokerController     final BrokerController controller = new BrokerController(         brokerConfig,         nettyServerConfig,         nettyClientConfig,         messageStoreConfig);     // remember all configs to prevent discard     controller.getConfiguration().registerConfig(properties);     // BrokerController的初始化     boolean initResult = controller.initialize(); } 复制代码

BrokerController初始化

BrokerController属性

BrokerController有很多属性,捡几个介绍。

public BrokerController(     // 四个核心组件     final BrokerConfig brokerConfig,     final NettyServerConfig nettyServerConfig,     final NettyClientConfig nettyClientConfig,     final MessageStoreConfig messageStoreConfig ) {     this.brokerConfig = brokerConfig;     this.nettyServerConfig = nettyServerConfig;     this.nettyClientConfig = nettyClientConfig;     this.messageStoreConfig = messageStoreConfig;     // ToDo: k2->Broker各种功能对应的组件     // 管理Consumer消费offset     this.consumerOffsetManager = new ConsumerOffsetManager(this);     // 管理Topic配置     this.topicConfigManager = new TopicConfigManager(this);     // 处理Consumer拉取消息请求     this.pullMessageProcessor = new PullMessageProcessor(this);     this.pullRequestHoldService = new PullRequestHoldService(this);     this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); ... } 复制代码

BrokerController初始化

org.apache.rocketmq.broker.BrokerController#initialize

主要是关于Broker配置文件信息的加载,相关组件线程池的创建,以及一些定时任务。

// ToDo: k1->BrokerController的初始化 public boolean initialize() throws CloneNotSupportedException {     // 加载磁盘配置信息,这些配置信息就使用到了MessageStoreConfig     boolean result = this.topicConfigManager.load();     result = result && this.consumerOffsetManager.load();     result = result && this.subscriptionGroupManager.load();     result = result && this.consumerFilterManager.load();     if (result) {         try {             // 消息存储管理组件,管理磁盘上的消息的             this.messageStore =                 new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,                     this.brokerConfig);             // 如果启动Dledger,就初始化与Dledger相关的组件             if (messageStoreConfig.isEnableDLegerCommitLog()) {                 DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);                 ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);             } ...         } catch (IOException e) {             result = false;             log.error("Failed to initialize", e);         }     }     // 加载磁盘文件     result = result && this.messageStore.load();     // ToDo: k2->Broker的Netty组件,注意:Broker既是服务端(接收Producer和Consumer的请求),又是客户端(向NameServer和Consumer发送消息)     if (result) { ...         // 处理Consumer的pull请求的线程池         this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(             this.brokerConfig.getPullMessageThreadPoolNums(),             this.brokerConfig.getPullMessageThreadPoolNums(),             1000 * 60,             TimeUnit.MILLISECONDS,             this.pullThreadPoolQueue,             new ThreadFactoryImpl("PullMessageThread_"));  ...         // 定时进行统计Broker的任务         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 try {                     BrokerController.this.getBrokerStats().record();                 } catch (Throwable e) {                     log.error("schedule record error.", e);                 }             }         }, initialDelay, period, TimeUnit.MILLISECONDS);         // 定时进行Consumer消费offset持久化到磁盘的任务         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 try {                     BrokerController.this.consumerOffsetManager.persist();                 } catch (Throwable e) {                     log.error("schedule persist consumerOffset error.", e);                 }             }         }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);         // 对Consumer的filter过滤器进行持久化的任务,从这里也可以知道消费者的filter是被下推到Broker来执行的         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 try {                     BrokerController.this.consumerFilterManager.persist();                 } catch (Throwable e) {                     log.error("schedule persist consumer filter error.", e);                 }             }         }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); ...         // 定时进行落后Commitlog分发的任务         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 try {                     log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());                 } catch (Throwable e) {                     log.error("schedule dispatchBehindBytes error.", e);                 }             }         }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);         // 设置NameServer的地址列表,可以设置加载,也可以发起运城请求加载         if (this.brokerConfig.getNamesrvAddr() != null) {             this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());             log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());         } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {                 @Override                 public void run() {                     try {                         // 定时获取NameServer地址                         BrokerController.this.brokerOuterAPI.fetchNameServerAddr();                     } catch (Throwable e) {                         log.error("ScheduledTask fetchNameServerAddr exception", e);                     }                 }             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);         } ...         // 初始化,使用SPI加载的服务,可扩展         initialTransaction();         initialAcl();         initialRpcHooks();     }     return result; } 复制代码

初始化完成,开始启动

start启动

org.apache.rocketmq.broker.BrokerStartup#start

整个方法裁剪一下,就干了一件事,启动Broker。然后在start中启动了Broker包含的各种怪组件。部分代码如下:

// 启动Broker public void start() throws Exception {     // 启动消息存储组件     if (this.messageStore != null) {         this.messageStore.start();     }     // 启动Netty服务,这样就可以接收请求了     if (this.remotingServer != null) {         this.remotingServer.start();     }     if (this.fastRemotingServer != null) {         this.fastRemotingServer.start();     }     // 与文件相关的一个服务组件, 暂不深究     if (this.fileWatchService != null) {         this.fileWatchService.start();     }     // ToDo: k2->Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发送请求,例如心跳     if (this.brokerOuterAPI != null) {         this.brokerOuterAPI.start();     }     // ToDo: k2->broker核心的心跳注册任务,可以深入理解     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         @Override         public void run() {             try {                     BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());             } catch (Throwable e) {                 log.error("registerBrokerAll Exception", e);             }         }         // 任务时间间隔默认30秒     }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);     ... } 复制代码

以上就是Broker启动的整个流程。其整体结构如下:

image.png

接下来,来详细看一下Broker心跳注册的过程

Broker注册

org.apache.rocketmq.broker.BrokerController#registerBrokerAll

代码前半部分是Topic的相关配置。后面判断Broker是否需要注册:如下:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ...         // ToDo: k1->关键代码,判断是否需要注册,然后调用doRegisterBrokerAll去正真的注册Broker         if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),         this.getBrokerAddr(),         this.brokerConfig.getBrokerName(),         this.brokerConfig.getBrokerId(),         this.brokerConfig.getRegisterBrokerTimeoutMills())) {         doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);     } } 复制代码

org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

注册带代码逻辑的真正实现

// ToDo: k2->注册真正的核心代码 private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,     TopicConfigSerializeWrapper topicConfigWrapper) {     // Broker向所有的NameServer发起请求,所以注册结果返回的是一个List     List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( ...     // 如果注册结果大于0,那么就取list中的第一个值     if (registerBrokerResultList.size() > 0) {         RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);         if (registerBrokerResult != null) {             // 主节点地址             if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {                 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());             }             // 从节点地址             this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());             if (checkOrderConfig) {                 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());             }         }     } } 复制代码

封装请求报文,向NameServer发请求;

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

获取服务列表信息,封装报文,注册:

// ToDo: k1: 获取NameServer所有的列表信息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) {     brokerOuterExecutor.execute(new Runnable() { ...                 // 向NameServer注册                 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);                 // 注册完成,本地缓存                 if (result != null) {                     registerBrokerResultList.add(result);                 } ... } 复制代码

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

向netty真正发起请求的方法:

... // 封装一个网络请求 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); // 真正发送网络请求的地方,这个remotingClient就是一个NettyClinet RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); // 封装网络请求结果 assert response != null; 复制代码

至此注册完毕

image.png


作者:YuJian
链接:https://juejin.cn/post/7021042272905265159


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐