评论

收藏

[Java] RxJava的消息发送和线程切换实现原理

编程语言 编程语言 发布于:2021-10-07 16:14 | 阅读数:549 | 评论:0

这篇文章主要介绍了RxJava的消息发送和线程切换实现原理,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
rxjava是一个在java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。
它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。
rxjava相信大家都非常了解吧,今天分享一下rxjava的消息发送和线程源码的分析。最后并分享一个相关demo,让大家更加熟悉我们天天都在用的框架。
消息订阅发送
首先让我们看看消息订阅发送最基本的代码组成:
observable observable = observable.create(new observableonsubscribe<string>() {
   @override
   public void subscribe(observableemitter<string> emitter) throws exception {
     emitter.onnext("jack1");
     emitter.onnext("jack2");
     emitter.onnext("jack3");
     emitter.oncomplete();
   }
   });
 
   observer<string> observer = new observer<string>() {
   @override
   public void onsubscribe(disposable d) {
     log.d(tag, "onsubscribe");
   }
 
   @override
   public void onnext(string s) {
     log.d(tag, "onnext : " + s);
   }
 
   @override
   public void onerror(throwable e) {
     log.d(tag, "onerror : " + e.tostring());
   }
 
   @override
   public void oncomplete() {
     log.d(tag, "oncomplete");
   }
   };
 
   observable.subscribe(observer);
代码很简单,observable为被观察者,observer为观察者,然后通过observable.subscribe(observer),把观察者和被观察者关联起来。被观察者发送消息(emitter.onnext("内容")),观察者就可以在onnext()方法里回调出来。
我们先来看observable,创建是用observable.create()方法进行创建,源码如下:
public static <t> observable<t> create(observableonsubscribe<t> source) {
  objecthelper.requirenonnull(source, "source is null");
  return rxjavaplugins.onassembly(new observablecreate<t>(source));
}
 
public static <t> t requirenonnull(t object, string message) {
  if (object == null) {
   throw new nullpointerexception(message);
  }
  return object;
 }
 
public static <t> observable<t> onassembly(@nonnull observable<t> source) {
  function<? super observable, ? extends observable> f = onobservableassembly;
  if (f != null) {
   return apply(f, source);
  }
  return source;
}
可以看出,create()方法里最主要的还是创建用observableonsubscribe传入创建了一个observablecreate对象并且保存而已。
public final class observablecreate<t> extends observable<t> {
  final observableonsubscribe<t> source;
 
  public observablecreate(observableonsubscribe<t> source) {
  this.source = source;
  }
 
}
接着是创建observer,这比较简单只是单纯创建一个接口对象而已
public interface observer<t> {
  void onsubscribe(@nonnull disposable d);
 
  void onnext(@nonnull t t);
 
  void onerror(@nonnull throwable e);
  
  void oncomplete();
}
订阅发送消息
observable.subscribe(observer)的subscribe方法如下:
public final void subscribe(observer<? super t> observer) {
  objecthelper.requirenonnull(observer, "observer is null");
  try {
  observer = rxjavaplugins.onsubscribe(this, observer);
  objecthelper.requirenonnull(observer, "plugin returned null observer");
  subscribeactual(observer);
  } catch (nullpointerexception e) { // nopmd
  throw e;
  } catch (throwable e) {
  exceptions.throwiffatal(e);
  rxjavaplugins.onerror(e);
  nullpointerexception npe = new nullpointerexception("actually not, but can't throw other exceptions due to rs");
  npe.initcause(e);
  throw npe;
  }
}
 
//objecthelper.requirenonnull()方法
public static <t> t requirenonnull(t object, string message) {
  if (object == null) {
   throw new nullpointerexception(message);
  }
  return object;
}
 
//rxjavaplugins.onsubscribe()方法
public static <t> observer<? super t> onsubscribe(@nonnull observable<t> source, @nonnull observer<? super t> observer) {
  bifunction<? super observable, ? super observer, ? extends observer> f = onobservablesubscribe;
  if (f != null) {
  return apply(f, source, observer);
  }
  return observer;
}
从上面源码可以看出requirenonnull()只是做非空判断而已,而rxjavaplugins.onsubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeactual(observer);那么subscribeactual对应哪个代码段呢?
还记得observable.create()创建的observablecreate类吗,这就是subscribeactual()具体实现类,源码如下:
protected void subscribeactual(observer<? super t> observer) {
  createemitter<t> parent = new createemitter<t>(observer);
  observer.onsubscribe(parent);
  try {
  source.subscribe(parent);
  } catch (throwable ex) {
  exceptions.throwiffatal(ex);
  parent.onerror(ex);
  }
}
从上面的代码可以看出,首先创建了一个createemitter对象并传入observer,然后回到observer的onsubscribe()方法,而source就是我们之前创建observablecreate传入的observableonsubscribe对象。
class createemitter<t> extends atomicreference<disposable>
  implements observableemitter<t>, disposable {
 
 }
而createemitter又继承observableemitter接口,又回调observableonsubscribe的subscribe方法,对应着我们的:
observable observable = observable.create(new observableonsubscribe<string>() {
   @override
   public void subscribe(observableemitter<string> emitter) throws exception {
    emitter.onnext("jack1");
    emitter.onnext("jack2");
    emitter.onnext("jack3");
    emitter.oncomplete();
   }
});
当它发送消息既调用emitter.onnext()方法时,既调用了createemitter的onnext()方法:
public void onnext(t t) {
  if (t == null) {
  onerror(new nullpointerexception("onnext called with null. null values are generally not allowed in 2.x operators and sources."));
  return;
  }
  if (!isdisposed()) {
  observer.onnext(t);
  }
}
可以看到最终又回调了观察者的onnext()方法,把被观察者的数据传输给了观察者。有人会问
isdisposed()是什么意思,是判断要不要终止传递的,我们看emitter.oncomplete()源码:
public void oncomplete() {
  if (!isdisposed()) {
  try {
    observer.oncomplete();
  } finally {
    dispose();
  }
  }
}
 
public static boolean dispose(atomicreference<disposable> field) {
  disposable current = field.get();
  disposable d = disposed;
  if (current != d) {
    current = field.getandset(d);
    if (current != d) {
    if (current != null) {
      current.dispose();
    }
    return true;
    }
  }
  return false;
 }
 
public static boolean isdisposed(disposable d) {
  return d == disposed;
}
dispose()方法是终止消息传递,也就付了个disposed常量,而isdisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。
线程切换
在上面模板代码的基础上,线程切换只是改变了如下代码:
observable.subscribeon(schedulers.io())
   .observeon(androidschedulers.mainthread())
   .subscribe(observer);
下面我们对线程切换的源码进行一下分析,分为两部分:subscribeon()和observeon()
subscribeon()
首先是subscribeon()源码如下:
public final observable<t> subscribeon(scheduler scheduler) {
  objecthelper.requirenonnull(scheduler, "scheduler is null");
  return rxjavaplugins.onassembly(new observablesubscribeon<t>(this, scheduler));
}
我们传进去了一个scheduler类,scheduler是一个调度类,能够延时或周期性地去执行一个任务。
scheduler有如下类型:
类型使用方式含义使用场景ioschedulerschedulers.io()io操作线程读写sd卡文件,查询数据库,访问网络等io密集型操作newthreadschedulerschedulers.newthread()创建新线程耗时操作等singleschedulerschedulers.single()单例线程只需一个单例线程时computationschedulerschedulers.computation()cpu计算操作线程图片压缩取样、xml,json解析等cpu密集型计算trampolineschedulerschedulers.trampoline()当前线程需要在当前线程立即执行任务时handlerschedulerandroidschedulers.mainthread()android主线程更新ui等接着就没什么了,只是返回一个observablesubscribeon对象而已。
observeon()
首先看源码如下:
public final observable<t> observeon(scheduler scheduler) {
  return observeon(scheduler, false, buffersize());
}
 
public final observable<t> observeon(scheduler scheduler, boolean delayerror, int buffersize) {
  objecthelper.requirenonnull(scheduler, "scheduler is null");
  objecthelper.verifypositive(buffersize, "buffersize");
  return rxjavaplugins.onassembly(new observableobserveon<t>(this, scheduler, delayerror, buffersize));
}
这里也是没什么,只是最终返回一个observableobserveon对象而已。
接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是rxjava源码的精华,当他再次调用subscribeactual()方法时,已经不是之前的observablecreate()里subscribeactual方法了,而是最先调用observableobserveon的subscribeactual()方法,对应源码如下:
protected void subscribeactual(observer<? super t> observer) {
  if (scheduler instanceof trampolinescheduler) {
  source.subscribe(observer);
  } else {
  scheduler.worker w = scheduler.createworker();
  source.subscribe(new observeonobserver<t>(observer, w, delayerror, buffersize));
  }
}
在这里有两点要讲,一点是observeonobserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是observablesubscribeon的subscribe方法,而subscribe方法因为继承的也是observable,是observable里的方法,所以和上面的observablecreate一样的方法,所以会调用observablesubscribeon里的subscribeactual()方法,对应的代码如下:
public void subscribeactual(final observer<? super t> s) {
  final subscribeonobserver<t> parent = new subscribeonobserver<t>(s);
  s.onsubscribe(parent);
  parent.setdisposable(scheduler.scheduledirect(new subscribetask(parent)));
}
上面代码中,首先把observeonobserver返回给来的用subscribeonobserver“包装”起来,然后在回调observer的onsubscribe(),就是对应模板代码的onsubscribe()方法。
接着看subscribetask类的源码:
final class subscribetask implements runnable {
  private final subscribeonobserver<t> parent;
  subscribetask(subscribeonobserver<t> parent) {
  this.parent = parent;
  }
  @override
  public void run() {
  source.subscribe(parent);
  }
}
其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承runnable,说明这个类主要是线程运行。接着看scheduler.scheduledirect()方法对应的源码如下:
public disposable scheduledirect(@nonnull runnable run) {
  return scheduledirect(run, 0l, timeunit.nanoseconds);
}
 
public disposable scheduledirect(@nonnull runnable run, long delay, @nonnull timeunit unit) {
  final worker w = createworker();
  final runnable decoratedrun = rxjavaplugins.onschedule(run);
  disposetask task = new disposetask(decoratedrun, w);
  w.schedule(task, delay, unit);
  return task;
}
在这里,createworker()也是一个抽象方法,调用的是我们的调度类对应的schedulers类里面的方法,这里是ioscheduler类,
public final class ioscheduler extends scheduler{
 
  final atomicreference<cachedworkerpool> pool;
 
  //省略....
 
  public worker createworker() {
  return new eventloopworker(pool.get());
  }
 
  static final class eventloopworker extends scheduler.worker {
  private final compositedisposable tasks;
  private final cachedworkerpool pool;
  private final threadworker threadworker;
 
  final atomicboolean once = new atomicboolean();
 
  eventloopworker(cachedworkerpool pool) {
    this.pool = pool;
    this.tasks = new compositedisposable();
    this.threadworker = pool.get();
  }
 
  //省略....
 
  @nonnull
  @override
  public disposable schedule(@nonnull runnable action, long delaytime, @nonnull timeunit unit) {
    if (tasks.isdisposed()) {
    // don't schedule, we are unsubscribed
    return emptydisposable.instance;
    }
    return threadworker.scheduleactual(action, delaytime, unit, tasks);
  }
  }
 
}
 
 static final class cachedworkerpool implements runnable {
 
  //省略....
 
  threadworker get() {
  if (allworkers.isdisposed()) {
    return shutdown_thread_worker;
  }
  while (!expiringworkerqueue.isempty()) {
    threadworker threadworker = expiringworkerqueue.poll();
    if (threadworker != null) {
    return threadworker;
    }
  }
 
  threadworker w = new threadworker(threadfactory);
  allworkers.add(w);
  return w;
   }
   //省略....
}
这就是ioscheduler的createworker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让subscribetask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那threadworker的源码如下:
static final class threadworker extends newthreadworker {
  private long expirationtime;
  threadworker(threadfactory threadfactory) {
  super(threadfactory);
  this.expirationtime = 0l;
  }
 
  //省略代码....
 }
 
public class newthreadworker extends scheduler.worker implements disposable {
  private final scheduledexecutorservice executor;
 
  public newthreadworker(threadfactory threadfactory) {
  executor = schedulerpoolfactory.create(threadfactory);
  }
 
  public scheduledrunnable scheduleactual(final runnable run, long delaytime, @nonnull timeunit unit, @nullable disposablecontainer parent) {
  runnable decoratedrun = rxjavaplugins.onschedule(run);
 
  scheduledrunnable sr = new scheduledrunnable(decoratedrun, parent);
 
  if (parent != null) {
    if (!parent.add(sr)) {
    return sr;
    }
  }
 
  future<?> f;
  try {
    if (delaytime <= 0) {
    f = executor.submit((callable<object>)sr);
    } else {
    f = executor.schedule((callable<object>)sr, delaytime, unit);
    }
    sr.setfuture(f);
  } catch (rejectedexecutionexception ex) {
    if (parent != null) {
    parent.remove(sr);
    }
    rxjavaplugins.onerror(ex);
  }
 
  return sr;
  }
}
可以看到,这就调了原始的javaapi来进行线程池操作。
然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的observablecreate的subscribeactual(),既:
protected void subscribeactual(observer<? super t> observer) {
  createemitter<t> parent = new createemitter<t>(observer);
  observer.onsubscribe(parent);
  try {
    source.subscribe(parent);
  } catch (throwable ex) {
    exceptions.throwiffatal(ex);
    parent.onerror(ex);
  }
}
进行消息的订阅绑定。
当我们在调用 emitter.onnext(内容)时,是在io线程里的,那回调的onnext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeon()里的observeonobserver是执行观察者的线程的过程。
class observeonobserver<t> extends basicintqueuedisposable<t>
  implements observer<t>, runnable {
 
  //省略代码....
 
  observeonobserver(observer<? super t> actual, scheduler.worker worker, boolean delayerror, int buffersize) {
    this.actual = actual;
    this.worker = worker;
    this.delayerror = delayerror;
    this.buffersize = buffersize;
  }
 
  @override
  public void onsubscribe(disposable s) {
    if (disposablehelper.validate(this.s, s)) {
    this.s = s;
    if (s instanceof queuedisposable) {
      @suppresswarnings("unchecked")
      queuedisposable<t> qd = (queuedisposable<t>) s;
      int m = qd.requestfusion(queuedisposable.any | queuedisposable.boundary);
      if (m == queuedisposable.sync) {
      sourcemode = m;
      queue = qd;
      done = true;
      actual.onsubscribe(this);
      schedule();
      return;
      }
      if (m == queuedisposable.async) {
      sourcemode = m;
      queue = qd;
      actual.onsubscribe(this);
      return;
      }
    }
    queue = new spsclinkedarrayqueue<t>(buffersize);
    actual.onsubscribe(this);
    }
  }
 
  @override
  public void onnext(t t) {
    if (done) {
    return;
    }
    if (sourcemode != queuedisposable.async) {
    queue.offer(t);
    }
    schedule();
  }  
 
  void schedule() {
    if (getandincrement() == 0) {
    worker.schedule(this);
    }
  }
  //省略代码....
  }
当调用emitter.onnext(内容)方法,会调用上面的onnext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得androidschedulers.mainthread()吗,这个对应这个handlerscheduler这个类,所以createworker()对应着:
private static final class mainholder {
  static final scheduler default = new handlerscheduler(new handler(looper.getmainlooper()));
}
 
 
public worker createworker() {
  return new handlerworker(handler);
}
 
private static final class handlerworker extends worker {
  private final handler handler;
  private volatile boolean disposed;
 
  handlerworker(handler handler) {
    this.handler = handler;
  }
 
  @override
  public disposable schedule(runnable run, long delay, timeunit unit) {
    if (run == null) throw new nullpointerexception("run == null");
    if (unit == null) throw new nullpointerexception("unit == null");
    if (disposed) {
    return disposables.disposed();
    }
    run = rxjavaplugins.onschedule(run);
    scheduledrunnable scheduled = new scheduledrunnable(handler, run);
    message message = message.obtain(handler, scheduled);
    message.obj = this; // used as token for batch disposal of this worker's runnables.
    handler.sendmessagedelayed(message, unit.tomillis(delay));
    if (disposed) {
    handler.removecallbacks(scheduled);
    return disposables.disposed();
    }
    return scheduled;
  }
}
在next()方法里,运用android自带的handler消息机制,通过把方法包裹在message里,同通过handler.sendmessagedelayed()发送消息,就会在ui线程里回调next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。
总结一下:
DSC0000.png

observablecreate 一> observablesubscribeon 一> observableobserveon为初始化顺序
当调用observable.subscribe(observer)时的执行顺序
observableobserveon 一> observablesubscribeon 一> observablecreate
当发送消息的执行顺序
observablecreate 一> observablesubscribeon 一> observableobserveon
以上就是消息订阅和线程切换的源码的所有讲解了。
为了让你们理解更清楚,我仿照rxjava写了大概的消息订阅和线程切换的最基本代码和基本功能,以帮助你们理解
https://github.com/jack921/rxjava2demo
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持CodeAE代码之家
原文链接:https://www.jianshu.com/p/264b68fd96fa

关注下面的标签,发现更多相似文章