阅读 426

RocketMQ Binder集成消息订阅

16 RocketMQ Binder集成消息订阅

AbstractMessageChannelBinder类中提供了创建MessageProducer的协议,在初始化Binder的时候加载createConsumerEndpoint方法

RocketMQMessageChannelBinder完成RocketMQInboundChannelAdapter的创建和初始化

RocketMQMessageChannelBinder的createConsumerEndpoint方法:

@Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,       String group,       ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties)       throws Exception {    if (group == null || "".equals(group)) {       throw new RuntimeException(             "'group must be configured for channel " + destination.getName());    }    RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(          consumerProperties, rocketBinderConfigurationProperties, this);    listenerContainer.setConsumerGroup(group);    listenerContainer.setTopic(destination.getName());    listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());    listenerContainer.setSuspendCurrentQueueTimeMillis(          consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());    listenerContainer.setDelayLevelWhenNextConsume(          consumerProperties.getExtension().getDelayLevelWhenNextConsume());    listenerContainer          .setNameServer(rocketBinderConfigurationProperties.getNameServer());    listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));    RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(          listenerContainer, consumerProperties, instrumentationManager);    topicInUse.put(destination.getName(), group);    ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,          group, consumerProperties);    if (consumerProperties.getMaxAttempts() > 1) {       rocketInboundChannelAdapter             .setRetryTemplate(buildRetryTemplate(consumerProperties));       rocketInboundChannelAdapter             .setRecoveryCallback(errorInfrastructure.getRecoverer());    }    else {       rocketInboundChannelAdapter             .setErrorChannel(errorInfrastructure.getErrorChannel());    }    return rocketInboundChannelAdapter; } 复制代码

RocketMQInboundChannelAdapter是适配器,需要适配Spring Framework的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer是对RocketMQ客户端API的封装,适配器中持有它的对象。

RocketMQ提供两种消费模式:顺序消费和并发消费。RocketMQ客户端API中顺序消费的默认监听器是DefaultMessageListenerOrderly,并发消费的默认监听器是DefaultMessageListenerConcurrently类,无论哪个消费模式,监听器收到的消息都会回调RocketMQListener

RocketMQInboundChannelAdapter中创建和初始化RocketMQListener的实现类

RocketMQInboundChannelAdapter

@Override protected void onInit() {    if (consumerProperties == null          || !consumerProperties.getExtension().getEnabled()) {       return;    }    super.onInit();    if (this.retryTemplate != null) {       Assert.state(getErrorChannel() == null,             "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "                   + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "                   + "send an error message when retries are exhausted");    }    BindingRocketMQListener listener = new BindingRocketMQListener();    rocketMQListenerContainer.setRocketMQListener(listener);    if (retryTemplate != null) {       this.retryTemplate.registerListener(listener);    }    try {       rocketMQListenerContainer.afterPropertiesSet();    }    catch (Exception e) {       log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);       throw new IllegalArgumentException(             "rocketMQListenerContainer init error: " + e.getMessage(), e);    }    instrumentationManager.addHealthInstrumentation(          new Instrumentation(rocketMQListenerContainer.getTopic()                + rocketMQListenerContainer.getConsumerGroup())); } protected class BindingRocketMQListener implements RocketMQListener<Message>, RetryListener { @Override public void onMessage(Message message) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; if (enableRetry) { RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> { RocketMQInboundChannelAdapter.this.sendMessage(message); return null; }, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback); } else { RocketMQInboundChannelAdapter.this.sendMessage(message); } } @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } } 复制代码

DefaultMessageListenerOrderly收到RocketMQ消息后,先回调BindingRocketMQListener的onMessage方法,再调用RocketMQInboundChannelAdapter父类的sendMessage方法将消息发送到DirectChannel

Spring Cloud Stream的接收消息和发送消息的消息模型是一致的,Binder中接收的消息先发送到MessageChannel,由订阅的MessageChannel通过Dispatcher转发到对应的MessageHandler进行处理。

image-20211008204111598

RocketMQInboundChannelAdapter的父类MessageProducerSupport的getOutputChannel()得到的MessageChannel是在初始化RocketMQ Binder时传入的DirectChannel

MessageProducerSupport的getOutputChannel方法:

@Override public MessageChannel getOutputChannel() {    if (this.outputChannelName != null) {       synchronized (this) {          if (this.outputChannelName != null) {             this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName);             this.outputChannelName = null;          }       }    }    return this.outputChannel; } 复制代码

MessagingTemplate继承GenericMessagingTemplate类,实际执行doSend()方法发送消息

MessageChannel的实例是DirectChannel对象,复用前面消息发送流程,通过消息分发类MessageDispatcher把消息分发给MessageHandler

DirectChannel对应的消息处理器是StreamListenerMessageHandler

public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {    private final InvocableHandlerMethod invocableHandlerMethod;    private final boolean copyHeaders;    StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod,          boolean copyHeaders, String[] notPropagatedHeaders) {       super();       this.invocableHandlerMethod = invocableHandlerMethod;       this.copyHeaders = copyHeaders;       this.setNotPropagatedHeaders(notPropagatedHeaders);    }    @Override    protected boolean shouldCopyRequestHeaders() {       return this.copyHeaders;    }    public boolean isVoid() {       return this.invocableHandlerMethod.isVoid();    }    @Override    protected Object handleRequestMessage(Message<?> requestMessage) {       try {          return this.invocableHandlerMethod.invoke(requestMessage);       }       catch (Exception e) {          if (e instanceof MessagingException) {             throw (MessagingException) e;          }          else {             throw new MessagingException(requestMessage,                   "Exception thrown while invoking "                         + this.invocableHandlerMethod.getShortLogMessage(),                   e);          }       }    } } 复制代码

InvocableHandlerMethod使用java反射机制完成回调,StreamListenerMessageHandler与@

StreamListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法:

@Override public final void afterSingletonsInstantiated() {    this.injectAndPostProcessDependencies();    EvaluationContext evaluationContext = IntegrationContextUtils          .getEvaluationContext(this.applicationContext.getBeanFactory());    for (Map.Entry<String, List<StreamListenerHandlerMethodMapping>> mappedBindingEntry : this.mappedListenerMethods          .entrySet()) {       ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> handlers;       handlers = new ArrayList<>();       for (StreamListenerHandlerMethodMapping mapping : mappedBindingEntry             .getValue()) {          final InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory                .createInvocableHandlerMethod(mapping.getTargetBean(),                      checkProxy(mapping.getMethod(), mapping.getTargetBean()));          StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler(                invocableHandlerMethod,                resolveExpressionAsBoolean(mapping.getCopyHeaders(),                      "copyHeaders"),                this.springIntegrationProperties                      .getMessageHandlerNotPropagatedHeaders());          streamListenerMessageHandler                .setApplicationContext(this.applicationContext);          streamListenerMessageHandler                .setBeanFactory(this.applicationContext.getBeanFactory());          if (StringUtils.hasText(mapping.getDefaultOutputChannel())) {             streamListenerMessageHandler                   .setOutputChannelName(mapping.getDefaultOutputChannel());          }          streamListenerMessageHandler.afterPropertiesSet();          if (StringUtils.hasText(mapping.getCondition())) {             String conditionAsString = resolveExpressionAsString(                   mapping.getCondition(), "condition");             Expression condition = SPEL_EXPRESSION_PARSER                   .parseExpression(conditionAsString);             handlers.add(                   new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(                         condition, streamListenerMessageHandler));          }          else {             handlers.add(                   new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(                         null, streamListenerMessageHandler));          }       }       if (handlers.size() > 1) {          for (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper handler : handlers) {             Assert.isTrue(handler.isVoid(),                   StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS);          }       }       AbstractReplyProducingMessageHandler handler;       if (handlers.size() > 1 || handlers.get(0).getCondition() != null) {          handler = new DispatchingStreamListenerMessageHandler(handlers,                evaluationContext);       }       else {          handler = handlers.get(0).getStreamListenerMessageHandler();       }       handler.setApplicationContext(this.applicationContext);       handler.setChannelResolver(this.binderAwareChannelResolver);       handler.afterPropertiesSet();       this.applicationContext.getBeanFactory().registerSingleton(             handler.getClass().getSimpleName() + handler.hashCode(), handler);       this.applicationContext             .getBean(mappedBindingEntry.getKey(), SubscribableChannel.class)             .subscribe(handler);    }    this.mappedListenerMethods.clear(); } 复制代码

在Spring容器管理的所有单例对象初始化完成之后,遍历StreamListenerHandlerMethodMapping,进行InvocableHandlerMethod和StreamListenerMessageHandler的创建和初始化

StreamListenerHandlerMethodMapping保存了StreamListener和InvocableHandlerMethod的映射关系,映射关系的创建是在StreamListenerAnnotationBeanPostProcessor的postProcessAfterInitialization()方法

@Override public final Object postProcessAfterInitialization(Object bean, final String beanName)       throws BeansException {    Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean)          : bean.getClass();    Method[] uniqueDeclaredMethods = ReflectionUtils          .getUniqueDeclaredMethods(targetClass);    for (Method method : uniqueDeclaredMethods) {       StreamListener streamListener = AnnotatedElementUtils             .findMergedAnnotation(method, StreamListener.class);       if (streamListener != null && !method.isBridge()) {          this.streamListenerCallbacks.add(() -> {             Assert.isTrue(method.getAnnotation(Input.class) == null,                   StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);             this.doPostProcess(streamListener, method, bean);          });       }    }    return bean; } private void doPostProcess(StreamListener streamListener, Method method, Object bean) { streamListener = postProcessAnnotation(streamListener, method); Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional; orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream() .filter(t -> t.supports(method)).findFirst(); Assert.isTrue(orchestratorOptional.isPresent(), "A matching StreamListenerSetupMethodOrchestrator must be present"); StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional .get(); streamListenerSetupMethodOrchestrator .orchestrateStreamListenerSetupMethod(streamListener, method, bean); } @Override public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) { String methodAnnotatedInboundName = streamListener.value(); String methodAnnotatedOutboundName = StreamListenerMethodUtils .getOutboundBindingTargetName(method); int inputAnnotationCount = StreamListenerMethodUtils .inputAnnotationCount(method); int outputAnnotationCount = StreamListenerMethodUtils .outputAnnotationCount(method); boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName); StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, isDeclarative, streamListener.condition()); if (isDeclarative) { StreamListenerParameterAdapter[] toSlpaArray; toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters .size()]; Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments( method, methodAnnotatedInboundName, this.applicationContext, this.streamListenerParameterAdapters.toArray(toSlpaArray)); invokeStreamListenerResultAdapter(method, bean, methodAnnotatedOutboundName, adaptedInboundArguments); } else { registerHandlerMethodOnListenedChannel(method, streamListener, bean); } } private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { Assert.hasText(streamListener.value(), "The binding name cannot be null"); if (!StringUtils.hasText(streamListener.value())) { throw new BeanInitializationException( "A bound component name must be specified"); } final String defaultOutputChannel = StreamListenerMethodUtils .getOutboundBindingTargetName(method); if (Void.TYPE.equals(method.getReturnType())) { Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that does not return a value"); } else { Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that can return a value"); } StreamListenerMethodUtils.validateStreamListenerMessageHandler(method); StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add( streamListener.value(), new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel, streamListener.copyHeaders())); } 复制代码

StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add来创建并保存StreamListenerHandlerMethodMapping

这是使用Spring Cloud Stream的消息模型来使用RocketMQ,也可以使用SpringBoot集成的RocketMQ组件。


作者:周杰倫本人
链接:https://juejin.cn/post/7018474907042316295


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐