评论

收藏

[Sybase] ThreadLocal及其扩展

数据库 数据库 发布于:2021-12-31 13:22 | 阅读数:348 | 评论:0

更多内容关注微信公众号:fullstack888

ThreadLocal
ThreadLocal是线程本地变量,每个线程往这个ThreadLocal中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。
Thread类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个自己的ThreadLocalMap。ThreadLocalMap参照HashMap的实现,ThreadLocalMap的key为ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal本身,而是它的一个弱引用)。每个线程在往某个ThreadLocal里塞值的时候,都会往自己的ThreadLocalMap里存,读也是以某个ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。



ThreadLocalMap
ThreadLocalMap是ThreadLocal的静态内部类。ThreadLocalMap提供了一种为ThreadLocal定制的高效实现,并且自带一种基于弱引用的垃圾清理机制。
ThreadLocalMap是类似HashMap实现的另一种map实现,有自己的key和value,key为ThreadLocal,value为代码中放入的值。和HashMap一样有一个table数据,有get和set方法,在key的hash冲突时往下寻找对应对象槽位。
ThreadLocalMap里的节点Entry不同于HashMap里面的Entry,继承弱引用,是如下定义的。


static class Entry extends WeakReference<java.lang.ThreadLocal<?>> {
// 往ThreadLocal里实际塞入的值
Object value;
Entry(java.lang.ThreadLocal<?> k, Object v) {
super(k);
value = v;
   }
}


为什么用弱引用
弱引用是Java中四档引用的第三档,比软引用更加弱一些,如果一个对象没有强引用链可达,那么一般活不过下一次GC。当某个ThreadLocal已经没有强引用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会失效,这为ThreadLocalMap本身的垃圾清理提供了便利。
ThreadLocal的get和set方法





public T get() {
Thread t = Thread.currentThread(); //获取当前线程
ThreadLocalMap map = getMap(t); // 获取当前线程对象的实例变量threadLocals
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
    }
  }
return setInitialValue();
}

set方法逻辑同get方法
public void set(T value) {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null)
    map.set(this, value);
  else
    createMap(t, value);
}


InheritableThreadLocal
InheritableThreadLocal继承ThreadLocal,ThreadLocal在线程内实现一个局部变量,可以在线程的任何地方来访问,能够减少参数的传递,InheritableThreadLocal在子线程和父线程之间共享线程实例本地变量,也同样是为了减少参数的传递。
ThreadLocal使用的是Thread的实例变量threadLocals;InheritableThreadLocal使用的是Thread的实例变量inheritableThreadLocals。这两个变量类型都为ThreadLocal.ThreadLocalMap,用途不一样。
InheritableThreadLocal的实现如下:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
  }

ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
  }

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
  }
}

InheritableThreadLocal重载了getMap和createMap方法,操作的不再是threadLocals,而是inheritableThreadLocals。
在Thread初始化时有以下逻辑。
if (parent.inheritableThreadLocals != null)
      this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

这个保证了inheritableThreadLocals变量从父Thread传递到子Thread。



Transmittable ThreadLocal(TTL)
ThreadLocal实现了线程本地变量隔离,每个线程可以拥有不会被别的线程干扰的线程值map。
InheritableThreadLocal继承ThreadLocal,InheritableThreadLocal可以在子线程和父线程之间共享线程实例本地变量。
JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的
​​ThreadLocal​​值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的线程的​​ThreadLocal​​值传递到 任务执行时的Thread。
这种场景需要
​​TransmittableThreadLocal​​类继承并加强​​InheritableThreadLocal​​类,解决上述的问题。
框架/中间件集成
​​TTL(TransmittableThreadLocal)​​传递,通过​​TransmittableThreadLocal.Transmitter​​ 抓取当前线程的所有​​TTL​​值并在其他线程进行回放;在回放线程执行完业务操作后,恢复为回放线程原来的​​TTL​​值。
​​TransmittableThreadLocal.Transmitter​​提供了所有​​TTL​​值的抓取、回放和恢复方法(即​​CRR​​操作):




  • ​​capture​​方法:抓取线程(线程A)的所有​​TTL​​值。
  • ​​replay​​方法:在另一个线程(线程B)中,回放在​​capture​​方法中抓取的​​TTL​​值(回放的过程也就是把第一步抓取的线程实例本地变量设置到当前线程的实例本地变量中),并返回 回放前​​TTL​​值的备份
  • ​​restore​​方法:恢复线程B执行​​replay​​方法之前的​​TTL​​值(即备份,把第二步返回的回放前​​TTL​​值重新设置回池化中线程的实例本地变量)





整个过程的完整时序图
DSC0000.jpg



原理简析
对Runnable和Callable进行装饰,形成TtlRunnable和TtlCallable,在这两个装饰类里面主要多了一个引用对象(存放线程切换时线程的本地变量引用)。





private final AtomicReference<Object> capturedRef; //用户存放线程本地变量的map引用
再来看一下​​CRR​​操作的核心源码,CRR的代码逻辑时在TransmittableThreadLocal的静态内部类Transmitter中:
capture
public static Object capture() {
  Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
//holder是TransmittableThreadLocal类的静态对象,类型为InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>,用于存储线程的本地变量,这一份本地变量被TransmittableThreadLocal用于CRR操作
  for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
  }
return captured;
}

repaly

public static Object replay(Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL value only in captured
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
    }
  }
// 把capture抓取的线程本地变量设置到当前线程的本地变量中
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
  }
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}

restore

public static void restore(Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);

for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();

// clear the TTL value only in backup
// avoid the extra value of backup after restore
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
    }
  }

// 把replay步骤备份前`TTL`值重新设置回池化中线程的本地变量
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
  }
}

业务无感知
如果想让TTL框架更通用,对已经的业务场景进行无缝支持。需要把对Runnable、Callable、ExecuteService等池化类的装饰操作借助字节码在JVM启动时自动完成。
使用Java Agent来修饰JDK线程池实现类。这种方式实现线程池的传递是透明的,代码中没有修饰
​​Runnable​​或是线程池的代码。即可以做到应用代码 无侵入
关于 无侵入 的更多说明参见文档
​​Java Agent​​方式对应用代码无侵入。后面会写专门的文章介绍字节码增强这块的技术,这里就不展开了。
- END -
DSC0001.png

关注:fullstack888
学习架构知识
互联网后端架构
DSC0002.jpg



关注下面的标签,发现更多相似文章