RocketMQ Binder集成消息订阅
16 RocketMQ Binder集成消息订阅
AbstractMessageChannelBinder类中提供了创建MessageProducer的协议,在初始化Binder的时候加载createConsumerEndpoint方法
RocketMQMessageChannelBinder完成RocketMQInboundChannelAdapter的创建和初始化
RocketMQMessageChannelBinder的createConsumerEndpoint方法:
@Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) throws Exception { if (group == null || "".equals(group)) { throw new RuntimeException( "'group must be configured for channel " + destination.getName()); } RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer( consumerProperties, rocketBinderConfigurationProperties, this); listenerContainer.setConsumerGroup(group); listenerContainer.setTopic(destination.getName()); listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency()); listenerContainer.setSuspendCurrentQueueTimeMillis( consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis()); listenerContainer.setDelayLevelWhenNextConsume( consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); topicInUse.put(destination.getName(), group); ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, consumerProperties); if (consumerProperties.getMaxAttempts() > 1) { rocketInboundChannelAdapter .setRetryTemplate(buildRetryTemplate(consumerProperties)); rocketInboundChannelAdapter .setRecoveryCallback(errorInfrastructure.getRecoverer()); } else { rocketInboundChannelAdapter .setErrorChannel(errorInfrastructure.getErrorChannel()); } return rocketInboundChannelAdapter; } 复制代码
RocketMQInboundChannelAdapter是适配器,需要适配Spring Framework的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer是对RocketMQ客户端API的封装,适配器中持有它的对象。
RocketMQ提供两种消费模式:顺序消费和并发消费。RocketMQ客户端API中顺序消费的默认监听器是DefaultMessageListenerOrderly,并发消费的默认监听器是DefaultMessageListenerConcurrently类,无论哪个消费模式,监听器收到的消息都会回调RocketMQListener
RocketMQInboundChannelAdapter中创建和初始化RocketMQListener的实现类
RocketMQInboundChannelAdapter
@Override protected void onInit() { if (consumerProperties == null || !consumerProperties.getExtension().getEnabled()) { return; } super.onInit(); if (this.retryTemplate != null) { Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is " + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + "send an error message when retries are exhausted"); } BindingRocketMQListener listener = new BindingRocketMQListener(); rocketMQListenerContainer.setRocketMQListener(listener); if (retryTemplate != null) { this.retryTemplate.registerListener(listener); } try { rocketMQListenerContainer.afterPropertiesSet(); } catch (Exception e) { log.error("rocketMQListenerContainer init error: " + e.getMessage(), e); throw new IllegalArgumentException( "rocketMQListenerContainer init error: " + e.getMessage(), e); } instrumentationManager.addHealthInstrumentation( new Instrumentation(rocketMQListenerContainer.getTopic() + rocketMQListenerContainer.getConsumerGroup())); } protected class BindingRocketMQListener implements RocketMQListener<Message>, RetryListener { @Override public void onMessage(Message message) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; if (enableRetry) { RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> { RocketMQInboundChannelAdapter.this.sendMessage(message); return null; }, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback); } else { RocketMQInboundChannelAdapter.this.sendMessage(message); } } @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } } 复制代码
DefaultMessageListenerOrderly收到RocketMQ消息后,先回调BindingRocketMQListener的onMessage方法,再调用RocketMQInboundChannelAdapter父类的sendMessage方法将消息发送到DirectChannel
Spring Cloud Stream的接收消息和发送消息的消息模型是一致的,Binder中接收的消息先发送到MessageChannel,由订阅的MessageChannel通过Dispatcher转发到对应的MessageHandler进行处理。
RocketMQInboundChannelAdapter的父类MessageProducerSupport的getOutputChannel()得到的MessageChannel是在初始化RocketMQ Binder时传入的DirectChannel
MessageProducerSupport的getOutputChannel方法:
@Override public MessageChannel getOutputChannel() { if (this.outputChannelName != null) { synchronized (this) { if (this.outputChannelName != null) { this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName); this.outputChannelName = null; } } } return this.outputChannel; } 复制代码
MessagingTemplate继承GenericMessagingTemplate类,实际执行doSend()方法发送消息
MessageChannel的实例是DirectChannel对象,复用前面消息发送流程,通过消息分发类MessageDispatcher把消息分发给MessageHandler
DirectChannel对应的消息处理器是StreamListenerMessageHandler
public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler { private final InvocableHandlerMethod invocableHandlerMethod; private final boolean copyHeaders; StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod, boolean copyHeaders, String[] notPropagatedHeaders) { super(); this.invocableHandlerMethod = invocableHandlerMethod; this.copyHeaders = copyHeaders; this.setNotPropagatedHeaders(notPropagatedHeaders); } @Override protected boolean shouldCopyRequestHeaders() { return this.copyHeaders; } public boolean isVoid() { return this.invocableHandlerMethod.isVoid(); } @Override protected Object handleRequestMessage(Message<?> requestMessage) { try { return this.invocableHandlerMethod.invoke(requestMessage); } catch (Exception e) { if (e instanceof MessagingException) { throw (MessagingException) e; } else { throw new MessagingException(requestMessage, "Exception thrown while invoking " + this.invocableHandlerMethod.getShortLogMessage(), e); } } } } 复制代码
InvocableHandlerMethod使用java反射机制完成回调,StreamListenerMessageHandler与@
StreamListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法:
@Override public final void afterSingletonsInstantiated() { this.injectAndPostProcessDependencies(); EvaluationContext evaluationContext = IntegrationContextUtils .getEvaluationContext(this.applicationContext.getBeanFactory()); for (Map.Entry<String, List<StreamListenerHandlerMethodMapping>> mappedBindingEntry : this.mappedListenerMethods .entrySet()) { ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> handlers; handlers = new ArrayList<>(); for (StreamListenerHandlerMethodMapping mapping : mappedBindingEntry .getValue()) { final InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory .createInvocableHandlerMethod(mapping.getTargetBean(), checkProxy(mapping.getMethod(), mapping.getTargetBean())); StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler( invocableHandlerMethod, resolveExpressionAsBoolean(mapping.getCopyHeaders(), "copyHeaders"), this.springIntegrationProperties .getMessageHandlerNotPropagatedHeaders()); streamListenerMessageHandler .setApplicationContext(this.applicationContext); streamListenerMessageHandler .setBeanFactory(this.applicationContext.getBeanFactory()); if (StringUtils.hasText(mapping.getDefaultOutputChannel())) { streamListenerMessageHandler .setOutputChannelName(mapping.getDefaultOutputChannel()); } streamListenerMessageHandler.afterPropertiesSet(); if (StringUtils.hasText(mapping.getCondition())) { String conditionAsString = resolveExpressionAsString( mapping.getCondition(), "condition"); Expression condition = SPEL_EXPRESSION_PARSER .parseExpression(conditionAsString); handlers.add( new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper( condition, streamListenerMessageHandler)); } else { handlers.add( new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper( null, streamListenerMessageHandler)); } } if (handlers.size() > 1) { for (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper handler : handlers) { Assert.isTrue(handler.isVoid(), StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS); } } AbstractReplyProducingMessageHandler handler; if (handlers.size() > 1 || handlers.get(0).getCondition() != null) { handler = new DispatchingStreamListenerMessageHandler(handlers, evaluationContext); } else { handler = handlers.get(0).getStreamListenerMessageHandler(); } handler.setApplicationContext(this.applicationContext); handler.setChannelResolver(this.binderAwareChannelResolver); handler.afterPropertiesSet(); this.applicationContext.getBeanFactory().registerSingleton( handler.getClass().getSimpleName() + handler.hashCode(), handler); this.applicationContext .getBean(mappedBindingEntry.getKey(), SubscribableChannel.class) .subscribe(handler); } this.mappedListenerMethods.clear(); } 复制代码
在Spring容器管理的所有单例对象初始化完成之后,遍历StreamListenerHandlerMethodMapping,进行InvocableHandlerMethod和StreamListenerMessageHandler的创建和初始化
StreamListenerHandlerMethodMapping保存了StreamListener和InvocableHandlerMethod的映射关系,映射关系的创建是在StreamListenerAnnotationBeanPostProcessor的postProcessAfterInitialization()方法
@Override public final Object postProcessAfterInitialization(Object bean, final String beanName) throws BeansException { Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils .getUniqueDeclaredMethods(targetClass); for (Method method : uniqueDeclaredMethods) { StreamListener streamListener = AnnotatedElementUtils .findMergedAnnotation(method, StreamListener.class); if (streamListener != null && !method.isBridge()) { this.streamListenerCallbacks.add(() -> { Assert.isTrue(method.getAnnotation(Input.class) == null, StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER); this.doPostProcess(streamListener, method, bean); }); } } return bean; } private void doPostProcess(StreamListener streamListener, Method method, Object bean) { streamListener = postProcessAnnotation(streamListener, method); Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional; orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream() .filter(t -> t.supports(method)).findFirst(); Assert.isTrue(orchestratorOptional.isPresent(), "A matching StreamListenerSetupMethodOrchestrator must be present"); StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional .get(); streamListenerSetupMethodOrchestrator .orchestrateStreamListenerSetupMethod(streamListener, method, bean); } @Override public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) { String methodAnnotatedInboundName = streamListener.value(); String methodAnnotatedOutboundName = StreamListenerMethodUtils .getOutboundBindingTargetName(method); int inputAnnotationCount = StreamListenerMethodUtils .inputAnnotationCount(method); int outputAnnotationCount = StreamListenerMethodUtils .outputAnnotationCount(method); boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName); StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, isDeclarative, streamListener.condition()); if (isDeclarative) { StreamListenerParameterAdapter[] toSlpaArray; toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters .size()]; Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments( method, methodAnnotatedInboundName, this.applicationContext, this.streamListenerParameterAdapters.toArray(toSlpaArray)); invokeStreamListenerResultAdapter(method, bean, methodAnnotatedOutboundName, adaptedInboundArguments); } else { registerHandlerMethodOnListenedChannel(method, streamListener, bean); } } private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { Assert.hasText(streamListener.value(), "The binding name cannot be null"); if (!StringUtils.hasText(streamListener.value())) { throw new BeanInitializationException( "A bound component name must be specified"); } final String defaultOutputChannel = StreamListenerMethodUtils .getOutboundBindingTargetName(method); if (Void.TYPE.equals(method.getReturnType())) { Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that does not return a value"); } else { Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that can return a value"); } StreamListenerMethodUtils.validateStreamListenerMessageHandler(method); StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add( streamListener.value(), new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel, streamListener.copyHeaders())); } 复制代码
StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add来创建并保存StreamListenerHandlerMethodMapping
这是使用Spring Cloud Stream的消息模型来使用RocketMQ,也可以使用SpringBoot集成的RocketMQ组件。
作者:周杰倫本人
链接:https://juejin.cn/post/7018474907042316295