阅读 372

SkyWalking 自定义插件(Spring RabbitMQ)具体分析过程

这篇文章主要介绍了SkyWalking 自定义插件(Spring RabbitMQ)具体分析过程,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

SkyWalking 自定义插件(Spring RabbitMQ) 官方

RabbitMQ插件问题

skywalking官方提供的RabbitMQ插件存在缺陷,其只针对RabbitMQ官方原生Client实现扩展,但我们在项目中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨线程操作,导致跟踪ID断链。

具体分析过程

1.官方插件源码的拦截点是原生Consumer的handleDelivery方法,源码如下:

2.而Spring RabbitMQ消费者的默认实现是BlockingQueueConsumer, handleDelivery核心逻辑是把消息放到内部的BlockingQueue队列,不做真正的消费处理,因此拦截此处无法关联到消费者逻辑,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) {
        ...
            try {
                if (BlockingQueueConsumer.this.abortStarted > 0) {
                    if (!BlockingQueueConsumer.this.queue.offer(
                            new Delivery(consumerTag, envelope, properties, body, this.queueName),
                            BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
 
                        Channel channelToClose = super.getChannel();
                        RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
                        // Defensive - should never happen
                        BlockingQueueConsumer.this.queue.clear();
                        if (!this.canceled) {
                            RabbitUtils.cancel(channelToClose, consumerTag);
                        }
                        try {
                            channelToClose.close();
                        catch (@SuppressWarnings("unused") TimeoutException e) {
                            // no-op
                    }
                }
                else {
                    BlockingQueueConsumer.this.queue
                            .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
            }
            catch (@SuppressWarnings("unused") InterruptedException e) {
                Thread.currentThread().interrupt();
            catch (Exception e) {
                BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
        }

3.真正的消费处理在SimpleMessageListenerContainer,SimpleMessageListenerContainer继承Runnable接口,在其run方法中while循环调用mainLoop方法,整体调用链路为

4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最终在executeListener中执行消费逻辑


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void executeListener(Channel channel, Object data) {
    ...
        try {
      // 执行消费逻辑
            doExecuteListener(channel, data);
            if (sample != null) {
                this.micrometerHolder.success(sample, data instanceof Message
                        ? ((Message) data).getMessageProperties().getConsumerQueue()
                        : queuesAsListString());
            }
        }
        catch (RuntimeException ex) {
        ....
        }
    }

实现自定义插件

从上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的拦截点
实现源码已放到码云仓库:https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking调用链路

logback日志

到此这篇关于SkyWalking 自定义插件(Spring RabbitMQ)的文章就介绍到这了

原文链接:https://blog.csdn.net/AnIllusion/article/details/122916505


    文章分类
    百科问答
    版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
    相关推荐