阅读 77

ThreadLocal你懂了,你还懂TransmittableThreadLocal嘛?

前言

在上一篇文章中,给大家讲述了如何使用TransmittableThreadLocal解决线程间上下文传递的问题,今天这篇文章我们来看看TransmittableThreadLocal是如何实现线程间上下文传递的,它使用了什么方式解决了InheritableThreadLocal都没有解决的线程复用导致上下文污染的问题;

set()操作

我们可以进入TransmittableThreadLocal源码查看set()方法:

   public final void set(T value) {         if (!this.disableIgnoreNullValueSemantics && value == null) {             this.remove();         } else {             // 调用父类set()方法,其实就是设置线程属性inheritableThreadLocals             super.set(value);             // 把自己放进一个WeakHashMap中;             this.addThisToHolder();         }     } 复制代码

因为TransmittableThreadLocal继承了InheritableThreadLocal类,所以super.set()其实是将value值放进线程属性inheritableThreadLocals中;

addThisToHolder()方法中,其实就是在inheritableThreadLocals中再放进一个键值对,key对应的就是TransmittableThreadLocal对象本身,value就是null,相当于把Holder当成Set用;

set()方法执行完毕后,当前线程中的inheritableThreadLocals属性中有两个键值对;假如我们调用了set("china"),当前线程中的inheritableThreadLocals包含,Holder也是一个TransmittableThreadLocal对象:

TransmittableThreadLocal:"china", Holder:{   TransmittableThreadLocal:null } 复制代码

创建新的线程

Thread类中,我们发现在构造函数中有这么一段代码:

Thread parent = currentThread(); if (inheritThreadLocals && parent.inheritableThreadLocals != null)     this.inheritableThreadLocals =                 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 复制代码

当我们在应用程序中调用new Thread()时,新创建出来的线程实例会复制父线程的inheritableThreadLocals属性,这其实就是InheritableThreadLocal的原理;根据这个原理,我们可以知道线程池中的所有线程都会复制父线程的上下文;

包装任务

在使用TransmittableThreadLocal时,我们都会使用这么一个方法来包装任务:TtlRunnable.get(()->{}),我们也应该猜到了,它其实就是对我们要做的任务做了一层增强;

Transmitter.capture()

TtlRunnable中,我们可以发现这样一段代码:

private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture()); 复制代码

它在父线程中调用TtlRunnable.get()的时候就会执行,我们看看它做了什么操作:

        @NonNull         public static Object capture() {             HashMap<TransmittableThreadLocal.Transmitter.Transmittee<Object, Object>, Object> transmittee2Value = TransmittableThreadLocal.newHashMap(transmitteeSet.size());             Iterator var1 = transmitteeSet.iterator();              while(var1.hasNext()) {                 TransmittableThreadLocal.Transmitter.Transmittee transmittee = (TransmittableThreadLocal.Transmitter.Transmittee)var1.next();                  try {                     transmittee2Value.put(transmittee, transmittee.capture());                 } catch (Throwable var4) {                     if (TransmittableThreadLocal.logger.isLoggable(Level.WARNING)) {                         TransmittableThreadLocal.logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee + "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + var4, var4);                     }                 }             }              return new TransmittableThreadLocal.Transmitter.Snapshot(transmittee2Value);         } 复制代码

其实这是一个统一的入口方法,它遍历了transmitteeSet中的所有TransmittableThreadLocal.Transmitter.Transmittee实例对象,并依次调用了它们的capture()方法,最后把结果保存起来了,并最终返回给了capturedRef

我们依次看一下具体做了啥操作:

public HashMap<TransmittableThreadLocal<Object>, Object> capture() {     HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = TransmittableThreadLocal.newHashMap(((WeakHashMap)TransmittableThreadLocal.holder.get()).size());     // 在父线程中调用holder.get(),并遍历所有的key     Iterator var2 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();     while(var2.hasNext()) {         // 取出key值         TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var2.next();         // 拷贝key值         ttl2Value.put(threadLocal, threadLocal.copyValue());     }     return ttl2Value; } 复制代码

假如之前在父线程中的inheritableThreadLocals中存储的内容如下:

TransmittableThreadLocal:"china", Holder:{   TransmittableThreadLocal:null } 复制代码

那么最终ttl2Value中存储的值就是:

TransmittableThreadLocal:"china" 复制代码

第二个capture()可以不用关心,其实是用来收集ThreadLocal中的数据,这是TransmittableThreadLocal为了兼容ThreadLocal做的处理;TransmittableThreadLocal本身提供了registerThreadLocal这样的方法来兼容ThreadLocal

所以上述代码其实就是在创建任务时,从父线程中收集TransmittableThreadLocalThreadLocal的信息,并保存到每一个任务当中;

Transmitter.replay()

前面的数据收集工作全部做完后,我们的任务就被扔到线程池中执行,我们可以看一下包装后的run()方法:

    public void run() {         // 取出父线程中的快照信息         Object captured = this.capturedRef.get();         if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {             // 把该信息重放到当前线程的上下文中             Object backup = Transmitter.replay(captured);             try {                 // 执行业务代码                 this.runnable.run();             } finally {                 // 恢复现场                 Transmitter.restore(backup);             }          } else {             throw new IllegalStateException("TTL value reference is released after run!");         }     } 复制代码

我们看一下Transmitter.replay()做了啥操作,同样的还是有一个统一的入口,那么我们直接看具体的replay()操作:

@NonNull public HashMap<TransmittableThreadLocal<Object>, Object> replay(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {     // 在子线程中执行,Holder已经把父线程的inheritableThreadLocals属性复制过来了     HashMap<TransmittableThreadLocal<Object>, Object> backup = TransmittableThreadLocal.newHashMap(((WeakHashMap)TransmittableThreadLocal.holder.get()).size());     // 遍历Holder中的key     Iterator iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();     while(iterator.hasNext()) {         // 取出key值         TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();         // 通过key值找到之前设置的value值,因为子线程已经复制了父线程的inheritableThreadLocals属性         backup.put(threadLocal, threadLocal.get());         // 如果备份数据中没有找到这个key值,那么就做删除操作         if (!captured.containsKey(threadLocal)) {             iterator.remove();             threadLocal.superRemove();         }     }     // 把value设置到当前线程中     TransmittableThreadLocal.Transmitter.setTtlValuesTo(captured);     TransmittableThreadLocal.doExecuteCallback(true);     // 返回备份数据     return backup; } 复制代码

1.借助new Thread()会拷贝inheritableThreadLocals属性的特性,可以直接在子线程中通过Holder获取TransmittableThreadLocal对象,并通过TransmittableThreadLocal获取父线程中设置的value

2.子线程中的数据会和快照中的数据做一个比较,并删除无效数据;

3.把父线程中的快照数据设置进子线程中,这样就可以在子线程中获取TransmittableThreadLocal设置的value

Transmitter.restore()

我们可以看到在业务代码处理完毕后,还调用了Transmitter.restore(backup)的操作,我们来看一下里面都做了啥,同样也有一个统一的入口代码,我们直接略过看具体实现:

public void restore(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {     TransmittableThreadLocal.doExecuteCallback(false);     // 遍历Holder中的所有key     Iterator iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();     while(iterator.hasNext()) {         TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();         // 通过与backup的对比判断在子线程中是否设置了额外的键值对进TransmittableThreadLocal中         // 删除额外设置的数据         if (!backup.containsKey(threadLocal)) {             iterator.remove();             threadLocal.superRemove();         }     }     // 最终把子线程上下文恢复到backup,保持和执行业务之前的上下文一致     TransmittableThreadLocal.Transmitter.setTtlValuesTo(backup); } 复制代码

Transmitter.restore()的恢复操作时非常关键的,它很好地隔离了每一个任务间的上下文数据,虽然使用的是同一个线程执行不同的任务,但是在任务执行完毕后,通过Transmitter.restore()操作恢复了子线程的上下文数据;

小结

TransmittableThreadLocal通过对执行任务的包装,对每一个任务都做了一层增强,在任务创建的时候capture()复制了一份父线程的数据,同时利用new Thread()特性,使用Holder解决了在父子线程中获取TransmittableThreadLocal实例的问题,使得在不同的线程中都能很方便地获取TransmittableThreadLocal和对应的value值。通过在任务执行之前在子线程中replay(captured)重放快照的方式把上下文数据设置进子线程中,并在业务执行完毕后使用restore(backup)恢复子线程上下文。


作者:梦想实现家_Z
链接:https://juejin.cn/post/7171354392199692295


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐