public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
// 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁
private final CopyOnWriteArrayListsharedList;
// threadList是否使用弱引用
private final boolean weakThreadLocals;
// 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()
private final ThreadLocal<1list> threadList;
private final IBagStateListener listener;
// 等待获取连接的线程数:调 borrow() 方法+1,调完-1
private final AtomicInteger waiters;
// 连接池关闭标识
private volatile boolean closed;
// 队列大小为0的阻塞队列:生产者消费者模式
private final SynchronousQueuehandoffQueue;
public interface IConcurrentBagEntry {
int STATE_NOT_IN_USE = 0; // 空闲
int STATE_IN_USE = 1; // 活跃
int STATE_REMOVED = -1; // 移除
int STATE_RESERVED = -2; // 不可用
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
public interface IBagStateListener {
void addBagItem(int waiting);
}
public ConcurrentBag(final IBagStateListener listener) {
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
this.waiters = new AtomicInteger();
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
} else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
}
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
// Try the thread-local list first
// 先从 threadLocal 缓存中获取
final List list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
// 从尾部读取:后缓存的优先用,细节!
final Object entry = list.remove(i);
@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
// 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
// 并发情况下,保证能够及时补充连接
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
} finally {
// 等待连接数减 1
waiters.decrementAndGet();
}
}
public void requite(final T bagEntry) {
bagEntry.setState(STATE_NOT_IN_USE);
// 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
} else if ((i & 0xff) == 0xff) {
// 每遍历 255 个休眠 10 微妙
parkNanos(MICROSECONDS.toNanos(10));
} else {
// 线程让步
yield();
}
}
// 没有其它线程用,就放入本地缓存
final List threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
// 如果有线程等待获取连接,循环通过 handoffQueue 提交连接
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}
}
public boolean remove(final T bagEntry) {
// 使用 CAS 将连接置为 STATE_REMOVED 状态
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
// CAS 成功后再删除连接
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
@Override
public void close() {
closed = true;
}
public boolean reserve(final T bagEntry) {
return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
}